PythonSnowflake 

Python Worksheetsで機械学習モデルの構築〜推論までやってみた

こんにちは
DATUM STUDIO カスタマーアナリティクス部の長谷川です。

最近、SnowflakeからPythonやMLに関するアップデートが増えてきましたね。Python Worksheetsは3月にPreviewされ、version管理が出来たりと、改良されてきているような状況です。そして、6月にはステージに保存されたデータを読み込む機能がPreviewされました!
そこで今回は、Snowsight上でPythonを実行する環境Python Worksheetsを用いて、機械学習モデルを構築、ステージに保存。ステージに保存したモデルを読み込み推論する。という流れををやってみました。
ということで、Python Worksheets上でscikit-learnのワインデータセットを用いてRandom Forest Classifierの学習〜推論までをご紹介したいと思います。

1. 環境準備

こちらの公式サイトを参考に以下の手順で進めてください

・Snowsight で Anaconda サービス利用規約を確認して同意。
・(オプション) Anaconda に含まれていない Python ファイルとパッケージをステージに追加。
・使用するウェアハウスを選択。

次はPython Worksheetsの準備です。以下の手順で進めてください。

・Snowsightにサインイン。
・Wotksheetsを選択。
・ Select + » Python Worksheet。
・databaseとschemaを選択。
・実行warehouseの選択。
・ (オプション) [Packages] を選択して、Pythonライブラリを追加。

今回の記事では、以下のライブラリを事前に追加しておいて下さい。

・学習用 Python Worksheets:scikit-learn、joblib
・推論用 Python Worksheets:scikit-learn、joblib

2. 機械学習モデルの学習

それでは、モデルの学習・ステージへの保存から進めていきたいと思います。
今回はAnacondaに含まれているscikit-learnのワインデータセットを用いて、Random Forestによる分類を行います。
あらかじめ、scikit-learn、joblibのライブラリをPackagesより追加して下さい。
コード全体はこちらです。

# モデルの学習からステージへの保存まで
import snowflake.snowpark as snowpark
from snowflake.snowpark.session import Session

import pandas as pd
import io
import joblib

# モデル準備
from sklearn.datasets import load_wine
from sklearn.ensemble import RandomForestClassifier

# 評価指標
from sklearn.metrics import accuracy_score 

# 内部ステージに登録するための関数
def save_file(session, model, path):
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    # upload_streamメソッドで学習したモデルを内部ステージに登録
    session._conn._cursor.upload_stream(input_stream, path) 
    return     

# 実行関数
def main(session: snowpark.Session):
    # ワインデータセットのロードと整形
    wine = load_wine()
    df_local = pd.DataFrame(wine.data, columns=wine.feature_names)
    df_local['class'] = [wine.target_names[i] for i in wine.target]
    df_local.columns = [x.upper() for x in df_local.columns]
    df_snowpark = session.create_dataframe(df_local)
    
    # 学習、検証、テストデータに分割し、テーブルに保存
    weights = [0.7, 0.2, 0.1]
    dfsp_train, dfsp_val, dfsp_test = df_snowpark.random_split(weights, seed=42)
    
    dfsp_train.write.mode('overwrite').saveAsTable('wine_train')
    dfsp_val.write.mode('overwrite').saveAsTable('wine_validation')
    dfsp_test.write.mode('overwrite').saveAsTable('wine_test')

    # 学習、検証データの設定
    Xtrain = dfsp_train.drop('class').to_pandas() 
    ytrain = dfsp_train.select('class').to_pandas()
    Xvalid = dfsp_val.drop('class').to_pandas()
    yvalid = dfsp_val.select('class').to_pandas()
    
    # 学習の実行
    model = RandomForestClassifier(n_estimators=10, criterion='gini', max_depth=5, random_state=42)
    model.fit(Xtrain, ytrain)

    # 検証データを使用し、モデルの精度検証
    predictions = model.predict(Xvalid)
    accuracy = accuracy_score(yvalid, predictions)

    # accuracyの評価結果をreturnで返す
    result = session.create_dataframe([str(accuracy)], schema=['accuracy'])

    # 使用した検証データと予測結果をテーブルに保存
    df_tmp = pd.concat([dfsp_val.to_pandas(), pd.DataFrame(predictions , columns=['PREDICTION']) ], axis=1)
    dfsp_pred = session.create_dataframe(df_tmp)
    dfsp_pred.write.mode('overwrite').saveAsTable('wine_valid_pred')

    # モデルを内部ステージに保存
    model_stage = 'wine_models'
    query = f"""create or replace stage {model_stage}
             directory = (enable = true)
             copy_options = (on_error='skip_file')"""
    session.sql(query).collect()

    # 作成したステージにモデルファイルを保存
    save_file(session, model, f'@{model_stage}/predict_class.joblib')
    
    return result

handler functionについては、デフォルト設定されているmainのまま実行してください。
処理としては、特段難しいことはしておらず、下記の通りです。

1.sklearn.datasetsからwine_dataのロード
2.学習・検証・テストにデータを分割し、テーブルとして保存
3.モデルの構築
4.モデルの保存

今回は簡単のために、前処理などは実施していませんが、前処理を含めpipeline化するなどすれば、保存するモデルを一つですみそうです。2023年6月にPreviewとなったsnowflake.snowpark.filesでは、Worksheetからの保存は出来ないようなので、今回はこちらを参考に別の方法でモデルの保存を行っています。

3. モデルを用いた推論

それでは、先ほどステージに登録したモデルを用いて、推論を行いたいと思います。
あらかじめ、scikit-learn、joblibのライブラリをPackagesより追加して下さい。
コード全体はこちらです。

# 内部ステージからモデルの読み込み・推論まで
import snowflake.snowpark as snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.files import SnowflakeFile

import joblib
import pandas as pd

# モデル準備
from sklearn.ensemble import RandomForestClassifier

# 実行関数
def main(session: snowpark.Session): 
    # 内部ステージからモデルをロード
    model_stage = 'wine_models'
    with SnowflakeFile.open(f'@{model_stage}/predict_class.joblib','rb', require_scoped_url = False) as file:
        model = joblib.load(file)

    # テストデータの読み込み
    dfsp_test = session.table('wine_test') 
    inputs = dfsp_test.drop('class').to_pandas()
    
    # モデルによる推論
    predictions = model.predict(inputs)

    # 推論結果の出力
    df_tmp = pd.concat([dfsp_test.to_pandas(), pd.DataFrame(predictions , columns=['PREDICTION']) ], axis=1)
    dfsp_test_pred = session.create_dataframe(df_tmp)
    dfsp_test_pred.write.mode('overwrite').saveAsTable('wine_test_pred')
    
    return dfsp_test_pred

handler functionについては、デフォルト設定されているmainのまま実行してください。内部ステージに保存したモデルの読み込みは下記のコードで実施しており、Pythonのopen()関数とよく似たsnowflake.snowpark.files.SnowflakeFile.open()を用います。’require_scoped_url = False’についてはこちらに公式ドキュメントがあり、今回のように自身が所有するファイルへのアクセスを簡易に行うための引数です。

 # 内部ステージからモデルをロード
    model_stage = 'wine_models'
    with SnowflakeFile.open(f"@{model_stage}/predict_class.joblib","rb", require_scoped_url = False) as file:
        model = joblib.load(file)

4. まとめ

今回は、Snowsight上でワインデータセットを用いたRandom Forest Classiferの学習、推論までを行いました。
main関数の結果をreturnするだけでは、探索的データ分析には向かない印象ですが、機械学習をちょっとやってみるくらいであればよいのではないかなと思います。
Summit前に書いているこの記事では、全容は分かりませんが、MLスキーマのアップデートも控えていたり、Streamlit in Snowflakeと楽しみがまだまだありますね!
これからのSnowflakeにも期待です!

このページをシェアする:



DATUM STUDIOは、クライアントの事業成長と経営課題解決を最適な形でサポートする、データ・ビジネスパートナーです。
データ分析の分野でお客様に最適なソリューションをご提供します。まずはご相談ください。