GCPTech Blogデータ分析 

SnowflakeにEmbulkでデータをETL(GCP GCS編)

はいさーい!年末年始は半袖半ズボンで暮らしていたaipaです(ちょっと寒かった)。
前回「Embulkを使ってS3からSnowflakeへデータを転送する。」という記事を書きました。

当時、`embulk-output-snowflake` という名前のpluginが2つあって色々ハマったのですが、記事公開後、なんとアップデートされたようです!!ありがたい!!!
アップデートについてはこちら
ということで、今回はGoogle Cloud Storage(以後 GCS)からSnowflakeへ(今度こそ)データ転送がサクッとできるか試してみました。

はじめに
・Embulkを使って、GCSからSnowflakeへデータを飛ばそう
・Dockerを使ってEmbulkを実行する

この記事を作成するにあたって使用した環境について

$ sw_vers
ProductName:	Mac OS X
ProductVersion:	10.15.7
BuildVersion:	19H114

$ docker --version
Docker version 20.10.0, build 7287ab3

認証情報の用意(GCP)
GCSへbuketの用意と、アクセスするためにGCPでサービスアカウントを用意します。
今回の記事では省略します。

認証情報、DB等の用意(Snowflake)
Snowflakeへの登録手順、今回書き込むリソースの用意などはこの記事では省略します。
ユーザー名、パスワード、アカウントURL、作成するDB、WH名などをメモしておいてください。

Embulk実行環境の用意

前回に引き続き、Dockerおじさんの力をお借りします。
今回もDockerfileを修正します。

1.embulk-input-randomjで用意したdummyデータをembulk-output-gcsでGCSへ保存する
2.1で保存したデータをembulk-input-gcsで取得し、embulk-output-snowflakeでSnowflakeへ保存する

FROM openjdk:8-jre-alpine
 
# Embulk 本体をインストールする
RUN wget -q https://dl.embulk.org/embulk-latest.jar -O /bin/embulk \
  && chmod +x /bin/embulk
 
# 使いたいプラグインを入れる
RUN apk add --no-cache libc6-compat \
  && embulk gem install embulk-input-randomj embulk-input-gcs \
                        embulk-output-gcs embulk-output-snowflake
 
WORKDIR /work

ENTRYPOINT ["java", "-jar", "/bin/embulk"]

はい。あとはビルドして問題なく動作するか確認できたらおkです。

$ docker build -t embulk .
$ docker run --rm -it -v $(pwd):/work embulk --help
Embulk v0.9.23
Usage: embulk [-vm-options] <command> [--options]
Commands:
   mkbundle   <directory>                             # create a new plugin bundle environment.
   bundle     [directory]                             # update a plugin bundle environment.
   run        <config.yml>                            # run a bulk load transaction.
   cleanup    <config.yml>                            # cleanup resume state.
   preview    <config.yml>                            # dry-run the bulk load without output and show preview.
   guess      <partial-config.yml> -o <output.yml>    # guess missing parameters to create a complete configuration file.
   gem        <install | list | help>                 # install a plugin or show installed plugins.
   new        <category> <name>                       # generates new plugin template
   migrate    <path>                                  # modify plugin code to use the latest Embulk plugin API
   example    [path]                                  # creates an example config file and csv file to try embulk.
   selfupdate [version]                               # upgrades embulk to the latest released version or to the specified version.
 
VM options:
   -E...                            Run an external script to configure environment variables in JVM
                                    (Operations not just setting envs are not recommended nor guaranteed.
                                     Expect side effects by running your external script at your own risk.)
   -J-O                             Disable JVM optimizations to speed up startup time (enabled by default if command is 'run')
   -J+O                             Enable JVM optimizations to speed up throughput
   -J...                            Set JVM options (use -J-help to see available options)
   -R--dev                          Set JRuby to be in development mode
 
Use `<command> --help` to see description of the commands.

準備おk。

データの用意

GCS → Snowflakeへ転送するdummyデータを用意します。

$ vim dummy_config.yml

in:
  type: randomj
  rows: 10000
  threads: 4
  primary_key: id
  schema:
  - {name: id, type: long}
  - {name: name, type: string}
  - {name: hash, type: string, length: 16}
  - {name: hobby, type: string, null_rate: 1000}
  - {name: price, type: long}
  - {name: day, type: long, min_value: 1, max_value: 31}
  - {name: average, type: double}
  - {name: rate, type: double, min_value: -100, max_value: 100}
  - {name: flag, type: boolean}
  - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
  - {name: date, type: timestamp, format: '%Y-%m-%d', start_date: 20180101, end_date: 20201231}
 
out:
  type: gcs
  auth_method: json_key
  json_keyfile: ${service_account_json_path}
  bucket: ${bucket_name}
  path_prefix: snowflake_test_
  file_ext: .csv
  formatter:
    type: csv

※${}の箇所はご自分の環境や準備したリソースにあわせて修正してください。
用意ができましたら、早速実行していきましょう。

$ docker run --rm -it -v $(pwd):/work embulk run dummy_config.yml
YYYY-MM-DD 05:24:57.930 +0000: Embulk v0.9.23
YYYY-MM-DD 05:24:58.673 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
YYYY-MM-DD 05:25:00.801 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
YYYY-MM-DD 05:25:01.498 +0000 [INFO] (main): Started Embulk v0.9.23
YYYY-MM-DD 05:25:01.619 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-randomj (0.5.1)
YYYY-MM-DD 05:25:01.649 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-gcs (0.4.4)
YYYY-MM-DD 05:25:01.679 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=12 / output tasks 8 = input tasks 4 * 2
YYYY-MM-DD 05:25:01.746 +0000 [INFO] (0001:transaction): {done:  0 / 4, running: 0}
YYYY-MM-DD 05:25:06.681 +0000 [INFO] (0016:task-0003): Local Hash(MD5): fDxPeXzJYwPEu7Y9Rqd0rQ== / Remote Hash(MD5): fDxPeXzJYwPEu7Y9Rqd0rQ==
YYYY-MM-DD 05:25:06.682 +0000 [INFO] (0016:task-0003): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0016_task-0003_8171522683112773181.tmp > true
YYYY-MM-DD 05:25:06.682 +0000 [INFO] (0016:task-0003): Uploaded '${bucket_name}/snowflake_test_.006.01.csv' to 1021159bytes
YYYY-MM-DD 05:25:07.360 +0000 [INFO] (0014:task-0001): Local Hash(MD5): xmnx4RPMn7AYwAYgMbfDyw== / Remote Hash(MD5): xmnx4RPMn7AYwAYgMbfDyw==
YYYY-MM-DD 05:25:07.361 +0000 [INFO] (0014:task-0001): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0014_task-0001_6835785199099919143.tmp > true
YYYY-MM-DD 05:25:07.361 +0000 [INFO] (0014:task-0001): Uploaded '${bucket_name}/snowflake_test_.002.01.csv' to 1021214bytes
YYYY-MM-DD 05:25:07.525 +0000 [INFO] (0015:task-0002): Local Hash(MD5): 5G7BikF24cxQ+of4zG0imw== / Remote Hash(MD5): 5G7BikF24cxQ+of4zG0imw==
YYYY-MM-DD 05:25:07.526 +0000 [INFO] (0015:task-0002): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0015_task-0002_4339302799679309702.tmp > true
YYYY-MM-DD 05:25:07.526 +0000 [INFO] (0015:task-0002): Uploaded '${bucket_name}/snowflake_test_.004.01.csv' to 1021434bytes
YYYY-MM-DD 05:25:07.719 +0000 [INFO] (0013:task-0000): Local Hash(MD5): iDpEcfQNjngfzw3maaDCsQ== / Remote Hash(MD5): iDpEcfQNjngfzw3maaDCsQ==
YYYY-MM-DD 05:25:07.720 +0000 [INFO] (0013:task-0000): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0013_task-0000_5008137694799641171.tmp > true
YYYY-MM-DD 05:25:07.720 +0000 [INFO] (0013:task-0000): Uploaded '${bucket_name}/snowflake_test_.000.01.csv' to 1015035bytes
YYYY-MM-DD 05:25:09.203 +0000 [INFO] (0016:task-0003): Local Hash(MD5): ohFqIr9/u9o1mYxjv2RDKA== / Remote Hash(MD5): ohFqIr9/u9o1mYxjv2RDKA==
YYYY-MM-DD 05:25:09.204 +0000 [INFO] (0016:task-0003): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0016_task-0003_1196395092125882981.tmp > true
YYYY-MM-DD 05:25:09.204 +0000 [INFO] (0016:task-0003): Uploaded '${bucket_name}/snowflake_test_.007.01.csv' to 1006570bytes
YYYY-MM-DD 05:25:10.132 +0000 [INFO] (0013:task-0000): Local Hash(MD5): qMe8fFAZZ6KPQIyiLGmu1Q== / Remote Hash(MD5): qMe8fFAZZ6KPQIyiLGmu1Q==
YYYY-MM-DD 05:25:10.132 +0000 [INFO] (0013:task-0000): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0013_task-0000_5544480335092926268.tmp > true
YYYY-MM-DD 05:25:10.132 +0000 [INFO] (0013:task-0000): Uploaded '${bucket_name}/snowflake_test_.001.01.csv' to 1001992bytes
YYYY-MM-DD 05:25:10.134 +0000 [INFO] (0001:transaction): {done:  2 / 4, running: 2}
YYYY-MM-DD 05:25:10.383 +0000 [INFO] (0015:task-0002): Local Hash(MD5): 1wfSRUPfCxsubviE/TFCOg== / Remote Hash(MD5): 1wfSRUPfCxsubviE/TFCOg==
YYYY-MM-DD 05:25:10.384 +0000 [INFO] (0015:task-0002): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0015_task-0002_3918785590840355354.tmp > true
YYYY-MM-DD 05:25:10.384 +0000 [INFO] (0015:task-0002): Uploaded '${bucket_name}/snowflake_test_.005.01.csv' to 1005641bytes
YYYY-MM-DD 05:25:14.364 +0000 [INFO] (0014:task-0001): Local Hash(MD5): XWzQtUYu41OqQaH4AfYtAw== / Remote Hash(MD5): XWzQtUYu41OqQaH4AfYtAw==
YYYY-MM-DD 05:25:14.365 +0000 [INFO] (0014:task-0001): Delete generated file: /tmp/embulk20210104T052501Z5955928662865901338/0014_task-0001_3018326129276642397.tmp > true
YYYY-MM-DD 05:25:14.365 +0000 [INFO] (0014:task-0001): Uploaded '${bucket_name}/snowflake_test_.003.01.csv' to 1006794bytes
YYYY-MM-DD 05:25:14.367 +0000 [INFO] (0001:transaction): {done:  4 / 4, running: 0}
YYYY-MM-DD 05:25:14.368 +0000 [INFO] (0001:transaction): {done:  4 / 4, running: 0}
YYYY-MM-DD 05:25:14.368 +0000 [INFO] (0001:transaction): {done:  4 / 4, running: 0}
YYYY-MM-DD 05:25:14.384 +0000 [INFO] (main): Committed.
YYYY-MM-DD 05:25:14.384 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}

ブラウザでGCSのBucketを確認してみましょう。

GCSのBucket画面

はい。おkです。データの準備も完了しました。

GCS → SnowflakeでETL

それではSnowflakeへデータをETLしていきたいと思います。
ETLを実行するためのconfig.ymlを用意していきましょう。
項目の入力は面倒なのでguessりましょう。

$ vim seed.yml

in:
  type: gcs
  bucket: ${bucket_name}
  path_prefix: snowflake_test_
  auth_method: json_key
  json_keyfile: ${service_account_json_path}
out:
  type: snowflake

guessオプションを実行して、inputの列やファイルフォーマットを自動推測します。

$ docker run --rm -it -v $(pwd):/work embulk guess seed.yml -o config.yml
YYYY-MM-DD 05:55:22.087 +0000: Embulk v0.9.23
YYYY-MM-DD 05:55:22.987 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
YYYY-MM-DD 05:55:25.103 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
YYYY-MM-DD 05:55:25.762 +0000 [INFO] (main): Started Embulk v0.9.23
YYYY-MM-DD 05:55:25.876 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-gcs (0.3.2)
YYYY-MM-DD 05:55:28.208 +0000 [INFO] (0001:guess): Try to read 32,768 bytes from input source
YYYY-MM-DD 05:55:29.815 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
YYYY-MM-DD 05:55:29.833 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
YYYY-MM-DD 05:55:29.862 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
YYYY-MM-DD 05:55:29.880 +0000 [INFO] (0001:guess): Loaded plugin embulk (0.9.23)
in:
  type: gcs
  bucket: ${bucket_name}
  path_prefix: snowflake_test_
  auth_method: json_key
  json_keyfile: ${service_account_json_path}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: name, type: string}
    - {name: hash, type: string}
    - {name: hobby, type: string}
    - {name: price, type: long}
    - {name: day, type: long}
    - {name: average, type: double}
    - {name: rate, type: double}
    - {name: flag, type: boolean}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
    - {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
out: {type: snowflake}

はいおkです。
次にoutputのsnowflakeの設定を用意します。
項目は前回の記事を参考にしています。

$ vim config.yml
in:
  type: gcs
  bucket: ${bucket_name}
  path_prefix: snowflake_test_
  auth_method: json_key
  json_keyfile: ${service_account_json_path}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: name, type: string}
    - {name: hash, type: string}
    - {name: hobby, type: string}
    - {name: price, type: long}
    - {name: day, type: long}
    - {name: average, type: double}
    - {name: rate, type: double}
    - {name: flag, type: boolean}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
    - {name: date, type: timestamp, format: '%Y-%m-%d %H:%M:%S.%N %z'}
out:
  type: snowflake
  host: ${host}
  user: ${user}
  password: ${password}
  warehouse: ${warehouse}
  database: ${database}
  schema: PUBLIC
  table: ${table_name}
  mode: replace

準備おkですね。それではコマンドを実行します。はたして(ちむどんどん)
修正後再度実行!

$ docker run --rm -it -v $(pwd):/work embulk run config.yml
YYYY-MM-DD 05:57:08.608 +0000: Embulk v0.9.23
YYYY-MM-DD 05:57:09.472 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
YYYY-MM-DD 05:57:11.518 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
YYYY-MM-DD 05:57:12.195 +0000 [INFO] (main): Started Embulk v0.9.23
YYYY-MM-DD 05:57:12.345 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-gcs (0.3.2)
YYYY-MM-DD 05:57:12.375 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-snowflake (0.2.0)
YYYY-MM-DD 05:57:14.794 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=12 / tasks=8
YYYY-MM-DD 05:57:14.813 +0000 [INFO] (0001:transaction): JDBC Driver = /root/.embulk/lib/gems/gems/embulk-output-snowflake-0.2.0/default_jdbc_driver/snowflake-jdbc-3.12.8.jar
YYYY-MM-DD 05:57:14.826 +0000 [INFO] (0001:transaction): Connecting to jdbc:snowflake://chura_data.ap-northeast-1.aws.snowflakecomputing.com options {db=${database}, warehouse=${warehouse}, user=${user}, password=***, schema=PUBLIC}

# 省略

YYYY-MM-DD 05:57:41.948 +0000 [INFO] (0001:transaction): TransactionIsolation=unknown
YYYY-MM-DD 05:57:41.948 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "${table_name}_00000176cbf8a92b_embulk"
YYYY-MM-DD 05:57:42.229 +0000 [INFO] (0001:transaction): > 0.28 seconds
YYYY-MM-DD 05:57:42.386 +0000 [INFO] (main): Committed.
YYYY-MM-DD 05:57:42.386 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"snowflake_test_.007.01.csv"},"out":{}}

わーい。うまくいったようです!!snowflakeへログインしてデータが入っているか確認します。

転送確認画面

何事もなく転送できましたー!

まとめ

今度はすぐできることができました!!!これなら便利ですね〜〜〜。関係者の皆様に感謝ですー。
それではー


Snowflakeに関するお問い合わせはサービス/研修のお問い合わせからご連絡ください。

このページをシェアする:



DATUM STUDIOは、クライアントの事業成長と経営課題解決を最適な形でサポートする、データ・ビジネスパートナーです。
データ分析の分野でお客様に最適なソリューションをご提供します。まずはご相談ください。