AWSTech Blogデータ分析基盤 

embulkでRedshiftにデータを取り込む

この記事について

embulkでRedshiftにデータを取り込む方法について解説します。

どうしてembulkを使うのか

まず、Redshiftにデータを取り込む際には、大きく分けて、embulkで取り込む方法と、RedshiftのCOPYコマンドで取り込む方法の2つが考えられます。embulkを使うメリットデメリットは下記のとおりです。

  • メリット
    • データのチェック機能があり、汚いデータをインポートするのに強い。
    • 元のcsvファイルなどから、特定の条件に該当するレコードのみをインポートするなど、条件抽出もできる。
  • デメリット
    • 処理がCOPYコマンドに比べて遅い(内部的にはデータのチェック=>COPYの実行を行っているので)。特に大きなデータを扱う際にその速度差が顕著に出る。

このように、汚いデータ(数値として取り込みたい列に文字列が入っているなど)を取り込む際や、元ファイルから特定の条件に該当するレコードのみインポートしなければいけないなどにembulkは強みを発揮します。 注意: 以下は、「embulkを使って、s3に置いたデータファイルをRedshiftにインポートする」ことを前提に書きます。本来embulkの良いところは同じymlファイルの記法を覚えるだけで多種多様なDBへのデータ転送が行える点にありますが、本記事ではそれらについで触れません。

embulkでデータを取り込むのに何が必要か

embulkの実行には、

  • embulkのインストール
  • 必要なembulkのプラグインのインストール
  • 設定ファイル※

が必要になります。 ※embulkの設定ファイルは.yml(yaml、ヤムル)というファイル形式で記述できます。具体的な方法は後述します。

ymlファイルの書き方

embulkの設定ファイル、yamlファイルの書き方について先に解説します。 先にymlファイルの実例を示した上で、各部の構造について書きます。

ymlファイルのサンプル

in:
 type: s3
 bucket: sample-s3-bucket
 path_prefix: sample-folder/data.csv
 endpoint: s3-myregion.amazonaws.com
 access_key_id: your_aws_access_key
 secret_access_key: your_aws_seacret_access_key
 parser:
 charset: UTF-8
 newline: CRLF
 type: csv
 delimiter: ','
 quote: '"'
 escape: '"'
 trim_if_not_quoted: false
 skip_header_lines: 1
 allow_extra_columns: false
 allow_optional_columns: false
 columns:
 - {name: col1, type: string}
 - {name: col2, type: double}
 - {name: col3, type: long}
out:
  type: redshift
  host: myinstance.myregion.redshift.amazonaws.com
  user: user01
  password: password
  database: my_database
  table: table_name
  aws_access_key_id: your_aws_secret_access_key
  aws_secret_access_key: your_aws_secret_access_key
  iam_user_name: my-s3-read-only
  s3_bucket: my-redshift-transfer-bucket
  s3_key_prefix: temp/redshift
  mode: insert
  column_options:
    col_1: {type: 'VARCHAR(255)'}
    col_2: {type: 'DOUBLE PRECISION'}
    col_3: {type: 'INT NOT NULL'}  
filters:
  - type: row
  condition: AND
  conditions:
  - {column: col1,  operator: "IS NOT NULL"}
  - {column: col2,   operator: ">=", argument: 10}
  - {column: col3,   operator: "<",  argument: 20}

ymlファイルの構造

見て気づいたかもしれませんが、embulkのyamlは大きく分けて、in:, out:, filters:に分けられます(今回のサンプルコードの場合)。in:out:部分は必須で 、必要に応じてfilters:部分を記述します。 それぞれの部分の記述法について個別に解説します。

in:

in:部分では、embulkでインポートするファイルの情報を記載します。 今回の場合は、s3上にあるcsvファイルを指定しているので、type:オプションをs3に指定した上で、bucket:,path_prefix:,endpoint:, access_key_id:, secret_access_key:オプションを設定します。 embulkを実行する前に、embulk-input-s3 プラグインがインストールされていることを確認しましょう(インストール方法は後述)。 各オプションの設定方法は下記の通りです。

本来はそれ以下のオプションも当然指定しなければならないのですが、guessコマンドを使って手順を簡略化できます。guessコマンドについてguessコマンドは、取り込むファイル(の先頭の一部分)を取得し、yml ファイルのin:部分の適切な設定を書いてくれます。 例えば、ファイル形式や文字コード、区切り文字、ヘッダの有無、それぞれの列のデータ型なども自動で判定してくれます。 下の実行コマンドサンプルでは、before.ymlというファイルにguessコマンドを適用し、after.ymlというファイルに結果を出力しています。bundle exec embulk guess before.yml -o after.ymlguessコマンドを使ったあとのチェックポイント本来は、guessコマンドによって追記された部分全てをチェックするべきですが、下記はチェックすべきです。

  • columns:のnameに必要なカラムが網羅されているか。
  • columns:のnameが取り込みたいデータ形式になっているか。このデータ形式に一致しないデータは取り込まれない。ただし、columnsはあくまでもJavaのデータ型であることに注意してください。 Javaのdoubleには入るけど、redshiftのDOUBLE PRECISION型に入らないようなデータというものが存在します。例えば、Javaのdouble型では文字列のNaNを格納できますが、RedshiftのDOUBLE PRECISION型にはNaNを格納できません(エラーが出てembulkの処理が終了する) columns: で使えるデータ型はこちらを参照してください。

out:outは、出力するredshiftの情報と、embulkを処理する際に発生する一時ファイル(データをチェックし、redshiftに格納される前のデータ)の置き場所となるs3のバケットを指定できます。 まず、type:にRedshiftを指定した上で、接続先となるredshiftの情報を記述しましょう。他のモードの詳細はこちらfilters:filtersオプションは毎回指定する必要があるものではありませんが、指定することで、取り込みたいファイルの中で、特定の条件に該当するレコードのみ抽出してインポートすることができます。

  • 準備 embulkをインストールし、3つのembulkプラグイン embulk-input-s3 , embulk-output-redshift, embulk-filter-row をインストールしましょう(手順はこちら)。 また、特定の条件で入力ファイルの一部のみ ※以下、bundler経由でembulkを使用することを前提に書いています。 ymlファイルを記述する。必要に応じてguessコマンドを使いましょう。

    bundle exec embulk guess kari.yml -o honban.yml
  • エラーが出ないかチェックする。embulk previewコマンドを使うことで、取り込み対象のcsvファイルの一部にだけ処理を実行し、その結果を見ることができます。ここで、ほしい形でデータが取り込めそうか確認しましょう。

    bundle exec embulk preview sample.yml
  • embulkを実行しましょう。実際にembulkによるデータの取り込みを行う際には、下記のコマンドを実行しましょう。

    bundle exec embulk run sample.yml

参考記事

オプション設定項目
bucket:対象のファイルが置いてあるs3のバケット
path_prefix:対象のファイルのs3内のパス
endpoint:対象のファイルが置いてあるs3のエンドポイント
access_key_id:対象のファイルが置いてあるAWSアクセスキー
secret_access_key:対象のファイルが置いてあるAWSシークレットアクセスキー
オプション設定項目
host:接続先のホスト名
user:接続するユーザ名
password:接続するユーザ名のパスワード
database:接続するDB
table:取り込んだデータを格納するテーブル名
mode:新規にテーブルを作成するならばinsertで大丈夫です
オプション設定項目
column_options:このオプションで、取り込むデータのカラム名・データ型を指定できます。ここのデータ型は Redshiftのデータ型をそのまま指定できます。下記のオプションは、embulkを処理する際に発生する一時ファイルの置き場所となるs3のバケットを指定するオプションです。書き込み権限があるIAMユーザ名とバケット名の組み合わせを指定してください。
access_key_id:対象のバケットにアクセスできるAWSアクセスキー
secret_access_key:対象のバケットにアクセスできるAWSシークレットアクセスキー
iam_user_name:対象のバケットにアクセスできるIAMユーザ名
s3_bucket:一時ファイルを保存するバケット名
s3_key_prefix:一時ファイルを保存するs3内のパス
オプション設定項目
type:オプションにrowを指定した上で、下記のオプションを設定しましょう。また、embulkを実行する前に、embulk-filter-row プラグインがインストールされていることを確認しましょう(インストール方法は後述)
condition:ANDもしくはORを指定する。下記conditions:に記述した条件に対する論理結合になります
conditions:各カラムについて、operatorに演算子、argument:に値を入力することで抽出条件を指定できます。文字列の場合は、START_WITH, IS NOT NULLなどの演算子、数値型の場合は、>, <=などの演算子が利用できます。具体的な使用できる抽出条件とデータ型の一覧は公式ドキュメントを参照してください

このページをシェアする:



DATUM STUDIOは、クライアントの事業成長と経営課題解決を最適な形でサポートする、データ・ビジネスパートナーです。
データ分析の分野でお客様に最適なソリューションをご提供します。まずはご相談ください。