SnowflakeにEmbulkでデータをETL(GCP GCS編)
はいさーい!年末年始は半袖半ズボンで暮らしていたaipaです(ちょっと寒かった)。
前回「Embulkを使ってS3からSnowflakeへデータを転送する。」という記事を書きました。
当時、`embulk-output-snowflake` という名前のpluginが2つあって色々ハマったのですが、記事公開後、なんとアップデートされたようです!!ありがたい!!!
アップデートについてはこちら
ということで、今回はGoogle Cloud Storage(以後 GCS)からSnowflakeへ(今度こそ)データ転送がサクッとできるか試してみました。
はじめに
・Embulkを使って、GCSからSnowflakeへデータを飛ばそう
・Dockerを使ってEmbulkを実行する
この記事を作成するにあたって使用した環境について
$ sw_vers ProductName: Mac OS X ProductVersion: 10.15.7 BuildVersion: 19H114 $ docker --version Docker version 20.10.0, build 7287ab3
認証情報の用意(GCP)
GCSへbuketの用意と、アクセスするためにGCPでサービスアカウントを用意します。
今回の記事では省略します。
認証情報、DB等の用意(Snowflake)
Snowflakeへの登録手順、今回書き込むリソースの用意などはこの記事では省略します。
ユーザー名、パスワード、アカウントURL、作成するDB、WH名などをメモしておいてください。
Embulk実行環境の用意
前回に引き続き、Dockerおじさんの力をお借りします。
今回もDockerfileを修正します。
1.embulk-input-randomjで用意したdummyデータをembulk-output-gcsでGCSへ保存する
2.1で保存したデータをembulk-input-gcsで取得し、embulk-output-snowflakeでSnowflakeへ保存する
FROM openjdk:8-jre-alpine # Embulk 本体をインストールする RUN wget -q https://dl.embulk.org/embulk-latest.jar -O /bin/embulk \ && chmod +x /bin/embulk # 使いたいプラグインを入れる RUN apk add --no-cache libc6-compat \ && embulk gem install embulk-input-randomj embulk-input-gcs \ embulk-output-gcs embulk-output-snowflake WORKDIR /work ENTRYPOINT ["java", "-jar", "/bin/embulk"]
はい。あとはビルドして問題なく動作するか確認できたらおkです。
$ docker build -t embulk . $ docker run --rm -it -v $(pwd):/work embulk --help Embulk v0.9.23 Usage: embulk [-vm-options] <command> [--options] Commands: mkbundle <directory> # create a new plugin bundle environment. bundle [directory] # update a plugin bundle environment. run <config.yml> # run a bulk load transaction. cleanup <config.yml> # cleanup resume state. preview <config.yml> # dry-run the bulk load without output and show preview. guess <partial-config.yml> -o <output.yml> # guess missing parameters to create a complete configuration file. gem <install | list | help> # install a plugin or show installed plugins. new <category> <name> # generates new plugin template migrate <path> # modify plugin code to use the latest Embulk plugin API example [path] # creates an example config file and csv file to try embulk. selfupdate [version] # upgrades embulk to the latest released version or to the specified version. VM options: -E... Run an external script to configure environment variables in JVM (Operations not just setting envs are not recommended nor guaranteed. Expect side effects by running your external script at your own risk.) -J-O Disable JVM optimizations to speed up startup time (enabled by default if command is 'run') -J+O Enable JVM optimizations to speed up throughput -J... Set JVM options (use -J-help to see available options) -R--dev Set JRuby to be in development mode Use `<command> --help` to see description of the commands.
準備おk。
データの用意
GCS → Snowflakeへ転送するdummyデータを用意します。
$ vim dummy_config.yml in: type: randomj rows: 10000 threads: 4 primary_key: id schema: - {name: id, type: long} - {name: name, type: string} - {name: hash, type: string, length: 16} - {name: hobby, type: string, null_rate: 1000} - {name: price, type: long} - {name: day, type: long, min_value: 1, max_value: 31} - {name: average, type: double} - {name: rate, type: double, min_value: -100, max_value: 100} - {name: flag, type: boolean} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: date, type: timestamp, format: '%Y-%m-%d', start_date: 20180101, end_date: 20201231} out: type: gcs auth_method: json_key json_keyfile: ${service_account_json_path} bucket: ${bucket_name} path_prefix: snowflake_test_ file_ext: .csv formatter: type: csv
※${}の箇所はご自分の環境や準備したリソースにあわせて修正してください。
用意ができましたら、早速実行していきましょう。
$ docker run --rm -it -v $(pwd):/work embulk run dummy_config.yml YYYY-MM-DD 05:24:57.930 +0000: Embulk v0.9.23 YYYY-MM-DD 05:24:58.673 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. YYYY-MM-DD 05:25:00.801 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" YYYY-MM-DD 05:25:01.498 +0000 [INFO] (main): Started Embulk v0.9.23 YYYY-MM-DD 05:25:01.619 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-randomj (0.5.1) YYYY-MM-DD 05:25:01.649 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-gcs (0.4.4) YYYY-MM-DD 05:25:01.679 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=12 / output tasks 8 = input tasks 4 * 2 YYYY-MM-DD 05:25:01.746 +0000 [INFO] (0001:transaction): {done: 0 / 4, running: 0} YYYY-MM-DD 05:25:06.681 +0000 [INFO] (0016:task-0003): Local Hash(MD5): fDxPeXzJYwPEu7Y9Rqd0rQ== / Remote Hash(MD5): fDxPeXzJYwPEu7Y9Rqd0rQ== YYYY-MM-DD 05:25:06.682 +0000 [INFO] (0016:task-0003): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0016_task-0003_8171522683112773181.tmp > true YYYY-MM-DD 05:25:06.682 +0000 [INFO] (0016:task-0003): Uploaded '${bucket_name}/snowflake_test_.006.01.csv' to 1021159bytes YYYY-MM-DD 05:25:07.360 +0000 [INFO] (0014:task-0001): Local Hash(MD5): xmnx4RPMn7AYwAYgMbfDyw== / Remote Hash(MD5): xmnx4RPMn7AYwAYgMbfDyw== YYYY-MM-DD 05:25:07.361 +0000 [INFO] (0014:task-0001): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0014_task-0001_6835785199099919143.tmp > true YYYY-MM-DD 05:25:07.361 +0000 [INFO] (0014:task-0001): Uploaded '${bucket_name}/snowflake_test_.002.01.csv' to 1021214bytes YYYY-MM-DD 05:25:07.525 +0000 [INFO] (0015:task-0002): Local Hash(MD5): 5G7BikF24cxQ+of4zG0imw== / Remote Hash(MD5): 5G7BikF24cxQ+of4zG0imw== YYYY-MM-DD 05:25:07.526 +0000 [INFO] (0015:task-0002): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0015_task-0002_4339302799679309702.tmp > true YYYY-MM-DD 05:25:07.526 +0000 [INFO] (0015:task-0002): Uploaded '${bucket_name}/snowflake_test_.004.01.csv' to 1021434bytes YYYY-MM-DD 05:25:07.719 +0000 [INFO] (0013:task-0000): Local Hash(MD5): iDpEcfQNjngfzw3maaDCsQ== / Remote Hash(MD5): iDpEcfQNjngfzw3maaDCsQ== YYYY-MM-DD 05:25:07.720 +0000 [INFO] (0013:task-0000): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0013_task-0000_5008137694799641171.tmp > true YYYY-MM-DD 05:25:07.720 +0000 [INFO] (0013:task-0000): Uploaded '${bucket_name}/snowflake_test_.000.01.csv' to 1015035bytes YYYY-MM-DD 05:25:09.203 +0000 [INFO] (0016:task-0003): Local Hash(MD5): ohFqIr9/u9o1mYxjv2RDKA== / Remote Hash(MD5): ohFqIr9/u9o1mYxjv2RDKA== YYYY-MM-DD 05:25:09.204 +0000 [INFO] (0016:task-0003): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0016_task-0003_1196395092125882981.tmp > true YYYY-MM-DD 05:25:09.204 +0000 [INFO] (0016:task-0003): Uploaded '${bucket_name}/snowflake_test_.007.01.csv' to 1006570bytes YYYY-MM-DD 05:25:10.132 +0000 [INFO] (0013:task-0000): Local Hash(MD5): qMe8fFAZZ6KPQIyiLGmu1Q== / Remote Hash(MD5): qMe8fFAZZ6KPQIyiLGmu1Q== YYYY-MM-DD 05:25:10.132 +0000 [INFO] (0013:task-0000): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0013_task-0000_5544480335092926268.tmp > true YYYY-MM-DD 05:25:10.132 +0000 [INFO] (0013:task-0000): Uploaded '${bucket_name}/snowflake_test_.001.01.csv' to 1001992bytes YYYY-MM-DD 05:25:10.134 +0000 [INFO] (0001:transaction): {done: 2 / 4, running: 2} YYYY-MM-DD 05:25:10.383 +0000 [INFO] (0015:task-0002): Local Hash(MD5): 1wfSRUPfCxsubviE/TFCOg== / Remote Hash(MD5): 1wfSRUPfCxsubviE/TFCOg== YYYY-MM-DD 05:25:10.384 +0000 [INFO] (0015:task-0002): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0015_task-0002_3918785590840355354.tmp > true YYYY-MM-DD 05:25:10.384 +0000 [INFO] (0015:task-0002): Uploaded '${bucket_name}/snowflake_test_.005.01.csv' to 1005641bytes YYYY-MM-DD 05:25:14.364 +0000 [INFO] (0014:task-0001): Local Hash(MD5): XWzQtUYu41OqQaH4AfYtAw== / Remote Hash(MD5): XWzQtUYu41OqQaH4AfYtAw== YYYY-MM-DD 05:25:14.365 +0000 [INFO] (0014:task-0001): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0014_task-0001_3018326129276642397.tmp > true YYYY-MM-DD 05:25:14.365 +0000 [INFO] (0014:task-0001): Uploaded '${bucket_name}/snowflake_test_.003.01.csv' to 1006794bytes YYYY-MM-DD 05:25:14.367 +0000 [INFO] (0001:transaction): {done: 4 / 4, running: 0} YYYY-MM-DD 05:25:14.368 +0000 [INFO] (0001:transaction): {done: 4 / 4, running: 0} YYYY-MM-DD 05:25:14.368 +0000 [INFO] (0001:transaction): {done: 4 / 4, running: 0} YYYY-MM-DD 05:25:14.384 +0000 [INFO] (main): Committed. YYYY-MM-DD 05:25:14.384 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}
ブラウザでGCSのBucketを確認してみましょう。
はい。おkです。データの準備も完了しました。
GCS → SnowflakeでETL
それではSnowflakeへデータをETLしていきたいと思います。
ETLを実行するためのconfig.ymlを用意していきましょう。
項目の入力は面倒なのでguessりましょう。
$ vim seed.yml in: type: gcs bucket: ${bucket_name} path_prefix: snowflake_test_ auth_method: json_key json_keyfile: ${service_account_json_path} out: type: snowflake
guessオプションを実行して、inputの列やファイルフォーマットを自動推測します。
$ docker run --rm -it -v $(pwd):/work embulk guess seed.yml -o config.yml YYYY-MM-DD 05:55:22.087 +0000: Embulk v0.9.23 YYYY-MM-DD 05:55:22.987 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. YYYY-MM-DD 05:55:25.103 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" YYYY-MM-DD 05:55:25.762 +0000 [INFO] (main): Started Embulk v0.9.23 YYYY-MM-DD 05:55:25.876 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-gcs (0.3.2) YYYY-MM-DD 05:55:28.208 +0000 [INFO] (0001:guess): Try to read 32,768 bytes from input source YYYY-MM-DD 05:55:29.815 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) YYYY-MM-DD 05:55:29.833 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) YYYY-MM-DD 05:55:29.862 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) YYYY-MM-DD 05:55:29.880 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) in: type: gcs bucket: ${bucket_name} path_prefix: snowflake_test_ auth_method: json_key json_keyfile: ${service_account_json_path} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: name, type: string} - {name: hash, type: string} - {name: hobby, type: string} - {name: price, type: long} - {name: day, type: long} - {name: average, type: double} - {name: rate, type: double} - {name: flag, type: boolean} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'} - {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'} out: {type: snowflake}
はいおkです。
次にoutputのsnowflakeの設定を用意します。
項目は前回の記事を参考にしています。
$ vim config.yml in: type: gcs bucket: ${bucket_name} path_prefix: snowflake_test_ auth_method: json_key json_keyfile: ${service_account_json_path} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: name, type: string} - {name: hash, type: string} - {name: hobby, type: string} - {name: price, type: long} - {name: day, type: long} - {name: average, type: double} - {name: rate, type: double} - {name: flag, type: boolean} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'} - {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'} out: type: snowflake host: ${host} user: ${user} password: ${password} warehouse: ${warehouse} database: ${database} schema: PUBLIC table: ${table_name} mode: replace
準備おkですね。それではコマンドを実行します。はたして(ちむどんどん)
修正後再度実行!
$ docker run --rm -it -v $(pwd):/work embulk run config.yml YYYY-MM-DD 05:57:08.608 +0000: Embulk v0.9.23 YYYY-MM-DD 05:57:09.472 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. YYYY-MM-DD 05:57:11.518 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" YYYY-MM-DD 05:57:12.195 +0000 [INFO] (main): Started Embulk v0.9.23 YYYY-MM-DD 05:57:12.345 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-gcs (0.3.2) YYYY-MM-DD 05:57:12.375 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-snowflake (0.2.0) YYYY-MM-DD 05:57:14.794 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=12 / tasks=8 YYYY-MM-DD 05:57:14.813 +0000 [INFO] (0001:transaction): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-output-snowflake-0.2.0/default_jdbc_driver/snowflake-jdbc-3.12.8.jar YYYY-MM-DD 05:57:14.826 +0000 [INFO] (0001:transaction): Connecting to jdbc:snowflake://chura_data.ap-northeast-1.aws.snowflakecomputing.com options {db=${database}, warehouse=${warehouse}, user=${user}, password=***, schema=PUBLIC} # 省略 YYYY-MM-DD 05:57:41.948 +0000 [INFO] (0001:transaction): TransactionIsolation=unknown YYYY-MM-DD 05:57:41.948 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "${table_name}_00000176cbf8a92b_embulk" YYYY-MM-DD 05:57:42.229 +0000 [INFO] (0001:transaction): > 0.28 seconds YYYY-MM-DD 05:57:42.386 +0000 [INFO] (main): Committed. YYYY-MM-DD 05:57:42.386 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"snowflake_test_.007.01.csv"},"out":{}}
わーい。うまくいったようです!!snowflakeへログインしてデータが入っているか確認します。
何事もなく転送できましたー!
まとめ
今度はすぐできることができました!!!これなら便利ですね〜〜〜。関係者の皆様に感謝ですー。
それではー
Snowflakeに関するお問い合わせはサービス/研修のお問い合わせからご連絡ください。
DATUM STUDIOは、クライアントの事業成長と経営課題解決を最適な形でサポートする、データ・ビジネスパートナーです。
データ分析の分野でお客様に最適なソリューションをご提供します。まずはご相談ください。
Contact
Explore Jobs
関連記事