Google Cloud 

Dataflowの基本的な流れ

ちゅらデータの塚田と申します。よろしくお願いいたします。
文字通り様々なサービス間にデータの流れを作るDataflow、その基本をご紹介します。

Dataflowとは

Dataflowとは、Apache Beamの力で大規模なデータ処理の構造を単純化する分散処理バックエンドです。

Apache Beamはデータの並列処理パイプラインを定義するオープンソースの統合モデルです。
バッチ(テキストファイルを投げつける等)とストリーミング(Pub/Subから不定期に流れてくる等)の両方に対応しています。

Apache Beamモデルで分散処理の「実行内容」をシンプルに記述し、Dataflowで分散処理の「実行方法=細かい設定」(個々のワーカーの調整やデータセットの分割などのタスク)を管理します。

(参考: Apache Beam のプログラミング モデル

なお、DataflowとBeamの関係は深いですが、Beamで記述されたジョブは他の実行エンジンでも実行できます。

Dataflow が Beam のマネージドサービスであるという想定により、Beam は実行エンジンと混同されることがよくあります。
しかし、そうではありません。Dataflow ジョブは Beam で作成され、Dataflow が実行エンジンとして機能します。
Apache Beam のメリットは、オープンソースの開発とポータビリティにあります。
ジョブはさまざまな言語で Beam に書き込むことができ、そのジョブは Dataflow、Apache Flink、Apache Spark、その他の実行エンジンで実行できます。つまり、Google Cloud に縛られることはありません。

(出典: Dataflow の仕組み: Dataflow と他のツールとの比較

DataflowではJava(+Maven),Go,PythonからApache Beam SDKを利用することができます。
なお、本記事ではJavaから利用することを前提としています。

公式ドキュメント

まずは公式ドキュメントをあたるのが定石です。これを全部読んだ頃には本記事に書かれていることは常識になっていることでしょう。
ただ全部読むのは大変なので、まずは軽く本記事を読んでいただけると幸いです。

各言語のDataflowクイックスタート一覧
単語カウントをサンプルとしたJava版チュートリアル
KafkaからBigQueryにデータを書き込むチュートリアル
Beamの基本的な概念について(英語)
日本語のBeam概要
Beamのプログラミングガイド(英語)
単語カウントを例としたBeamのコード付き解説(英語)
単語カウントの簡潔な解説、Java版(英語)
Beam用語集(英語)
Beam GET STARTED
・Beamサンプルソースコード
 [Java]
 [Python]
 [Go]

基本概念の概説

・PCollection
データの塊です。Windowによって適切な粒度に分けます。

・elements
(PCollectionの文脈では)PCollection内の1つ1つのデータを指します。テキストやバイナリそのもののことです。

・transforms
データに対するあらゆる操作・変換です。フィルター、GroupBy、集計、I/O等も含みます。一覧はこちら

・ParDo
element1つ1つに対する操作・変換(element-wise transform)です。

・DoFn
element1つに対する処理が記述された関数です。ParDoの引数となります。

・Pipeline
データの流れを記述したパイプラインです。Javaの場合、下記のように処理をつなげていきます。

PCollection<Hoge> foo = pipeline
.apply("Read from Stream", Fuga.readMessages())
.apply("Convert to Hoge", ParDo.of(new ToHogeFn()));

・Runner
Pipelineを実行するもので、Dataflow Runnerを使います。テスト用にローカルで動かすDirect Runnerもあります。

データの流れ

「読む → 変換 → 書く →(Runnerに実行させる)」という一連の流れをJavaのスニペットと共に解説していきます。

読む(Read, Create)

まずはPipelineを作ります。どんなRunnerで動くかなどの設定を行いますが、ここではいわゆる「おまじない」だと思っていてください。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Google Cloud Storageから読んだり、任意の型のインスタンスや定数から生成したりしてデータの始まりを決めます。

// From Google Cloud Storage.
p.apply(TextIO.read().from("gs://foo/text.txt"));
// From constants.
p.apply(Create.of("Hello", "World!!!"));

変換(transform)

MapElementsで変換します。(MapReduceでいうMapperのような操作をしたりテキストを文字数に変換する例)

PCollection<String> texts = p.apply(Create.of("Hello", "World!!!"));
PCollection<Integer> lengths = texts.apply(MapElements
    .into(TypeDescriptors.integers())
    .via((String line) -> line.length()));

FlatMapElementsで1対多の変換をしたりします。(テキストを「!」で分割する例)

PCollection<String> text = p.apply(Create.of("Hello!World"));
PCollection<String> texts = text.apply(FlatMapElements
    .into(TypeDescriptors.strings())
    .via((String text) -> Arrays.asList(text.split("!"))));

他にも様々な変換があります。(一覧はこちら)

書く(Write)

最後に(途中で行うこともあります)、結果をファイルに書き込んだり、各種サービスに送信したりします。
この工程は省略できますが、結果が残らないのでパイプラインを実行する意味がなくなってしまいます。

// To filesystems.
p.apply(TextIO.write().to("foo"));
// To Pub/Sub.
p.apply(PubsubIO.writeStrings().to("foo/topics/bar"));

実行(Pipeline.run)

Pipelineはあくまでデータの流れを記述したものに過ぎません。Runnerが実行して初めて結果が手に入ります。

p.run();
// For a blocking execution.
p.run().waitUntilFinish();

最後に

さて、Dataflowを学ぶにあたって必要になる概念を駆け足で解説してきました。他にも処理の流れを分岐させる、GroupByするなど様々な処理の記述ができるのですが、今回はその学習の踏み台になるような基礎を紹介いたしました。
クイックスタートも充実しておりますので、早速Googleアカウントを作成して手を動かしてみてはいかがでしょうか!

本記事が少しでもお力になれたなら幸いです。

このページをシェアする:



DATUM STUDIOは、クライアントの事業成長と経営課題解決を最適な形でサポートする、データ・ビジネスパートナーです。
データ分析の分野でお客様に最適なソリューションをご提供します。まずはご相談ください。