Snowflakeのエラーをslackに通知する
データエンジニア部の向井です。
Snowflakeでデータパイプラインを運用する際、データの不整合が起こりETLが異常終了する場合があります。
このような不具合発生時に迅速に対応するために、今回はSnowflakeのSQL, Task, Snowpipeが異常終了した際にSlackに通知を送ってみたいと思います。
目次
Slack通知を行う外部関数を設定する
Snowflake単体では外部のAPIを叩くことはできないので、外部関数を用いてSlackのWebhookから投稿を行います。外部関数の作成にはAWS lambda, API Gatewayを用います。詳細な手順は公式ドキュメントを参照ください。
今回はlambdaを以下のように設定します。
import urllib3
import json
http = urllib3.PoolManager()
url = <webhook url>
def lambda_handler(event, context):
event_body = event["body"]
payload = json.loads(event_body)
rows = payload["data"]
for row in rows:
row_number = row[0]
text = row[1]
msg = {
"channel": <channel name>,
"username": "test_user",
"text": text,
"icon_emoji": ""
}
encoded_msg = json.dumps(msg).encode('utf-8')
resp = http.request('POST',url, body=encoded_msg)
print({
"message": text,
"status_code": resp.status,
"response": resp.data
})
json_compatible_string_to_return = event_body
return {
"statusCode": resp.status,
"headers": {
"Content-Type": "*/*"
},
"body": event_body,
"isBase64Encoded": False
}
API Integrationが済んだ後、Snowflakeのコンソール画面から外部関数を作成します。
create or replace external function post_slack(text varchar)
returns variant
api_integration = <integration name>
as <api endpoint>;
外部関数を実行します。
select post_slack('test from snowflake');Slackを確認すると、指定したチャンネルに投稿されていることが確認できました。

ストアドプロシージャでSQLが失敗したときの処理を行う
SnowflakeのストアドプロシージャではJavaScriptのtry-catch文が実行できるため、ドキュメントにあるようにSQLのエラーハンドリングを行うことが出来ます。
前項で作成した外部関数post_slack()を呼び出すようにプロシージャを作成します。
create or replace procedure raise_error(query varchar)
returns varchar not null
language javascript
as
$$
var result = "";
try {
snowflake.execute( {sqlText: QUERY} );
result = "Succeeded";
}
catch (err) {
result = "Failed: Code: " + err.code + "\n State: " + err.state;
result += "\n Message: " + err.message;
result += "\nStack Trace:\n" + err.stackTraceTxt;
result = result.replace(/'/g,"\\'")
snowflake.execute({sqlText: `select post_slack('${result}');`});
}
return result;
$$
;
SQLのエラー文字列にクォーテーションが入っているのでエスケープしてあげる必要があります。
では存在しないテーブルをクエリし、エラーを出してみます。
call raise_error('select * from hoge;');
こちらもSlackに通知が飛んでいることが確認できました。
taskが落ちた際に通知したい
SnowflakeでETLを行う場合はtaskを用いる事が多いかと思います。taskからもストアドプロシージャを呼び出せるため、前項と同様にエラーハンドリングを行うことが出来ます。
適当なtaskを作ります。
create or replace task test_task
warehouse = <warehouse name>
schedule = '1 minute'
as
call raise_error('select * from fuga;');
タスクを実行待ち状態にします。
alter task test_task suspend;
しばらく待つと、1分おきにSlackに通知が飛んできます。
Snowpipeが失敗したときに通知したい
Snowpipeについても同様にエラー通知を出してみたいと思います。残念ながらSnowpipeではプロシージャを使うことは出来ないので、別の方法を考えます。
information_schemaのcopy_history関数をクエリすることでSnowpipeのロード履歴を参照することができますが、定期的にクエリするたびにウェアハウスが起動してお金がもったいないので、今回はSnowpipe REST APIをAWS lambdaから叩いてロード履歴を確認したいと思います。
Snowpipeの作成
適当なテーブルを作り、AWS S3から読みこめるようにSnowpipeを構成します。詳細な手順は公式ドキュメントに譲ります。
create table test_table ( id integer, text varchar(10) ); create or replace pipe test_pipe auto_ingest=true as copy into test_table from @<external stage> file_format = (type = 'CSV' skip_header=1);
以下のテストデータをS3にアップロードし、Snowflakeに読み込まれていることを確認します。
test_table.csv id,text 1,hoge
Snowpipe REST apiの確認
Snowpipe REST apiを使用するにはキーペア認証が必要になるため、ローカルでRSA鍵を発行します。詳細な手順はドキュメントを参照ください。pythonコードをローカルマシンで実行して試しにAPIを叩いてみます。
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from snowflake.ingest import SimpleIngestManager
from datetime import timedelta
import datetime
with open("./rsa_key.p8", "rb") as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
password=None,
backend=default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
ingest_manager = SimpleIngestManager(account='<account-name>',
host='<account-name>.snowflakecomputing.com',
user='<user-name>',
pipe='<pipe-name>',
private_key=private_key_text)
history_resp = ingest_manager.get_history()
print(history_resp)
date = datetime.datetime.utcnow() - timedelta(days=10)
history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')
print(history_range_resp)
以下のようなjsonが返されていることがわかります。
{'pipe': '<pipe_name>', 'completeResult': True, 'nextBeginMark': '2_-1', 'files': [], 'statistics': {'activeFilesCount': 0}}
{'files': [{'path': 'test_table.csv', 'stageLocation': 's3://<bucket-name>/', 'fileSize': 15, 'timeReceived': '2021-10-14T02:10:03.976Z', 'lastInsertTime': '2021-10-14T02:10:24.434Z', 'rowsInserted': 1, 'rowsParsed': 1, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'startTimeInclusive': '2021-10-08T16:16:59.815Z', 'endTimeExclusive': '2021-10-18T16:16:59.929Z', 'rangeStartTime': '2021-10-14T02:10:24.434Z', 'rangeEndTime': '2021-10-14T02:10:24.434Z', 'pipe': '<pipe-name>', 'completeResult': 'true'}ドキュメントに明示されてはいませんが、SimpleIngestManagerクラスのget_historyメソッドがinsertReport、get_history_rangeメソッドがloadHistoryScanに対応していそうです。監視要件に沿ってお好みで選びましょう。
後はこのコードをlambdaにデプロイしていきたいと思います。
lambdaでSnowpipeを定常監視
今回はlambdaでsnowflake-ingestのライブラリを用いるため、コンテナパッケージでデプロイを行いたいと思います。詳細な手順はAWSのドキュメントを参照ください。
lambdaを以下のようにします。snowflake-ingest-managerがssl証明書を更新してしまうので、slackのpost時に新しくデフォルトの証明書を設定し直す必要があります。認証情報はsecrets managerに格納します。必要に応じて暗号化等を行ってください。
from snowflake.ingest import SimpleIngestManager
import boto3
import json
import urllib3
import ssl
url = <webhook url>
secret_name = <secrets manager arn>
region_name = "ap-northeast-1"
def handler(event, context):
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
private_key_text = get_secret_value_response['SecretString']
# snowflake connect
ingest_manager = SimpleIngestManager(account=<account name>,
host=<host name>,
user=<user name>,
pipe=<pipe name>,
private_key=private_key_text)
history_resp = ingest_manager.get_history()
# post slack
ctx = ssl.create_default_context()
http = urllib3.PoolManager(ssl_context=ctx)
text = ''
for file_repo in history_resp['files']:
if(file_repo['errorsSeen']>=file_repo['errorLimit']):
text += 'snowpipe failed:\n'
for key, value in file_repo.items():
text += f' {key}:{value}\n'
msg = {
"channel": "<channel name>",
"username": "test_user",
"text": text,
"icon_emoji": ""
}
encoded_msg = json.dumps(msg).encode('utf-8')
resp = http.request('POST',url, body=encoded_msg)
print({
"message": text,
"status_code": resp.status,
"response": resp.data
})
return {
"statusCode": 200,
"headers": {
"Content-Type": "*/*"
},
"body": text,
"isBase64Encoded": False
}デプロイが完了したのでSnowpipeを失敗させてみます。test_table.textをvarchar(10)と定義しているので、text列に10文字以上の文字列を含んだcsvを連携してみます。
`test_table2.csv id,text 2,hogehogehoge
アップロードして1分前後待ち、AWSコンソールからlambdaをテストしてみます。
lambdaが正常終了し、

slackにも投稿されていることがわかります。このlambdaをcloudWatchで定期実行すればSnowpipeを定常監視することができます。
おわりに
今回は
・標準のSQL
・task
・Snowpipe
の3つについてエラーが起こった際にslackへと通知を行ってみました。
Snowflakeの運用についてご相談がございましたら、DATUM STUDIOまでお気軽にご連絡ください。