Snowflake機械学習 

Snowflake機械学習
Snowparkでユーザクラスタリングの実行

初めましてこんにちは、データ事業本部の中森です!
今回はSnowparkがリリースされたので、早速使ってみました!の記事になります。
UDFと合わせて、Snowflakeで機械学習の推論を試しました。
あれやこれやと手取り足取りSnowflakeのテクニカルサポートの方にお世話になってしまいましたが、とてもサポートが手厚いのでとても大変良かったです。
尚、現時点(2021/06/30)ではSnowparkを用いた機械学習トレーニングはサポートされていないとのことです。

本記事の内容

1.Snowparkを使ってみたい
2.Snowflakeのコンピューティングリソースで推論がしたい
3.PMMLを使ってirisをSnowflakeでクラスタリングする!

参考記事

Snowpark 0.5.0 – com.snowflake.snowpark
Setting Up Visual Studio Code for Snowpark — Snowflake Documentation
Creating User-Defined Functions (UDFs) for DataFrames — Snowflake Documentation
https://docs.snowflake.com/ja/developer-guide/snowpark/creating-udfs.html#reading-files-from-a-udf

使用した環境

・OS:
 ・MacOS BigSur
・VSCode:
 ・Metals: 1.10.6
・Scala:
 ・scala: version 2.12.13
 ・snowpark: 0.6.0
 ・typesafe: 1.4.1
 ・pmml4s: 0.9.11
・Python:
 ・python: 3.8.6
 ・pandas: 1.2.3
 ・sklearn: 0.24.2
 ・sklearn2pmml: 0.73.2
 ・pypmml: 0.9.11

前準備

・VSCodeでSnowparkを扱えるようにする
https://docs.snowflake.com/en/developer-guide/snowpark/quickstart-vscode.html
・iris.dataをダウンロードし、Snowflakeへアップロードする
https://datumstudio.jp/blog/1225_snowflake07/

流れ

今回の記事の流れは

1.sklearnを用いてiris.dataをクラスタリングするモデルをPMML形式で作成する
2.SnowparkからPMMLファイルを利用しiris.dataに対しクラスタリングするUDFを作成する
3.Snowparkを実行しクラスタリングモデルをSnowflakeのデータに対し適用する
4.HAPPY

です

推論用モデルを作成する

まず最初に、sklearnのKMeansを使ってクラスアタリングモデルの実装をやっていきます。
私は試行錯誤しながら作業したかったため、Jupyter Notebookを使用しました。

まずはダウンロードしておいたiris.dataの読み込みから。
ヘッダーは、Snowflakeに取り込んだ際のカラム名と合わせておきます。

import pandas as pd

headers = [
  "SEPAL_LENGTH",
  "SEPAL_WIDTH",
  "PETAL_LENGTH",
  "PETAL_WIDTH",
  "CLASS"
]
iris_df = pd.read_csv("iris.data", names=headers)
iris_df.head()
SEPAL_LENGTH	SEPAL_WIDTH	PETAL_LENGTH	PETAL_WIDTH	CLASS
0	5.1	3.5	1.4	0.2	Iris-setosa
1	4.9	3.0	1.4	0.2	Iris-setosa
2	4.7	3.2	1.3	0.2	Iris-setosa
3	4.6	3.1	1.5	0.2	Iris-setosa
4	5.0	3.6	1.4	0.2	Iris-setosa

読み込み確認までは簡単。

次にPMML形式でモデルを作成していきます。
データを特徴量とターゲットに分けて

iris_X = iris_df[iris_df.columns.difference(["CLASS"])]
iris_y = iris_df["CLASS"]

PMML形式でクラスタリングモデルを作成し

from sklearn.cluster import KMeans
from sklearn2pmml.pipeline import PMMLPipeline

pipeline = PMMLPipeline([
  ("model", KMeans(n_clusters=4, random_state=0))
])
pipeline.fit(iris_X)

PMMLファイルとして書き出します。

from sklearn2pmml import sklearn2pmml

sklearn2pmml(pipeline, "KMeansIris.pmml", with_repr = True)

これでカレントディレクトリにKMeansIris.pmmlファイルができていると思います。
念の為、ちゃんとモデルが生成されているか確認していきましょう。
ファイルを読み込んで、iris_dfに適用してみます。

from pypmml import Model

model = Model.fromFile("KMeansIris.pmml")
result = model.predict(iris_df)

result.head()
array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 3, 3, 3, 0, 3, 0, 3, 0, 3, 0, 0, 0, 0, 3, 0, 3,
       0, 0, 3, 0, 3, 0, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0, 3, 0, 3, 3, 3,
       0, 0, 0, 3, 0, 0, 0, 0, 0, 3, 0, 0, 2, 3, 2, 2, 2, 2, 0, 2, 2, 2,
       3, 3, 2, 3, 3, 2, 2, 2, 2, 3, 2, 3, 2, 3, 2, 2, 3, 3, 2, 2, 2, 2,
       2, 3, 3, 2, 2, 2, 3, 2, 2, 2, 3, 2, 2, 2, 3, 3, 2, 3], dtype=int32)

ちゃんと推論できていそうですね。

依存関係解決のための準備

作成したモデルや各種UDF実行に必要なファイル/ライブラリをjarにしておきます。
配置場所は `src/main/resouces` 下にしています。

モデル

作成したPMMLモデルをjarファイルにします。
プロジェクトルートより

cd src/main/resouces && jar cvf iris.jar path/to/model/KMeansIris.pmml

その他

下記ライブラリについてjarファイルをDL/生成
・pmml4s
・spray-json
・scala-xml
私は mvnrepository からDLしてきました。

https://mvnrepository.com/

Snowparkを使って、UDFを作成しSnowflakeへアップロードする

ここからは、VSCode上で作業していきます。
Snowflake公式チュートリアルを元にMetalsがインストール済みでhello-world PJが作成してあることを前提とします。
まずはbuild.sbtに依存関係を追加していきます。

# 追記
resolvers += "OSGeo Release Repository" at "https://repo.osgeo.org/repository/release/"

libraryDependencies ++= Seq(
    "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2",
    "com.snowflake" % "snowpark" % "0.6.0",
    "com.typesafe" % "config" % "1.4.1",
    "org.pmml4s" %% "pmml4s" % "0.9.11"
)

変更を保存したらimport changesを押下し、依存パッケージをダウンロードしていきます。

データに適用してみる

チュートリアルにもありますが、まずはSnowflakeのセッションを作成していきます。
今回は趣味でtypesafeのConfigFactoryを使ってconfigを作成していきます。

snowflake {
    url = "https://{YOUR_SNOWFLAKE_ACCOUNT}.snowflakecomputing.com:443",
    user = "{USER}",
    password = "{PASSWORD}",
    role = "{ROLE}",
    warehouse = "{WAREHOUSE}",
    db = "{DATABASE}",
    schema = "{SCHEMA}"
}

次にPMMLファイルとして出力されたファイルを読み込む部分を作っていきます。

package pmmlmodel
import org.pmml4s.model.Model

class KMeansIrisPMML(modelName: String) {
    def getModel(): Model = {
        val decisionTree: Model = Model.fromFile(s"src/main/resources/$modelName")

        decisionTree
    }
}

セッションを作成し

object Main {
  def main(args: Array[String]): Unit = {
...

  val conf = ConfigFactory.load
  val configs = Map(
    "URL" -> conf.getString("snowflake.url"),
    "USER" -> conf.getString("snowflake.user"),
    "PASSWORD" -> conf.getString("snowflake.password"),
    "ROLE" -> conf.getString("snowflake.role"),
    "WAREHOUSE" -> conf.getString("snowflake.warehouse"),
    "DB" -> conf.getString("snowflake.db"),
    "SCHEMA" -> conf.getString("snowflake.schema")
  )

  val session = Session.builder.configs(configs).create

...
}

依存関係をセッションに含めます。
この時の注意点として、DataFrames のユーザー定義関数(UDFs)の作成 — UDF からのファイルの読み取りにもある通り
Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。

今回、UDFが学習済みのPMMLモデルファイルを読み込む必要があるため、モデルファイルも依存先に追加していきます。

object Main {
  def main(args: Array[String]): Unit = {
...

  val libPath = new java.io.File("").getAbsolutePath
  session.addDependency(s"$libPath/src/main/resources/pmml4s_2.12-0.9.11.jar")
  session.addDependency(s"$libPath/src/main/resources/spray-json_2.12-1.3.6.jar")
  session.addDependency(s"$libPath/src/main/resources/scala-xml_2.12-1.2.0.jar")
  session.addDependency(s"$libPath/src/main/resources/iris.jar")

...
}

試しに一部のデータが描画できるか見てみましょう。

object Main {
  def main(args: Array[String]): Unit = {
...

  val irisSchema = StructType(
    StructField("sepal_length", DoubleType, nullable = true) ::
    StructField("sepal_width", DoubleType, nullable = true) ::
    StructField("petal_length", DoubleType, nullable = true) ::
    StructField("petal_width", DoubleType, nullable = true) ::
    StructField("class", StringType, nullable = true) ::
    Nil
  )
  val df = session.read.schema(irisSchema).table("iris_data")
  println(df.show())

...
}

実行はVSCodeならエディタ上のMain Objectの上に位置する行に run|debug とある run から実行確認を行います。

---------------------------------------------------------------------------------
|"SEPAL_LENGTH"  |"SEPAL_WIDTH"  |"PETAL_LENGTH"  |"PETAL_WIDTH"  |"CLASS"      |
---------------------------------------------------------------------------------
|5.1             |3.5            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.0            |1.4             |0.2            |Iris-setosa  |
|4.7             |3.2            |1.3             |0.2            |Iris-setosa  |
|4.6             |3.1            |1.5             |0.2            |Iris-setosa  |
|5.0             |3.6            |1.4             |0.2            |Iris-setosa  |
|5.4             |3.9            |1.7             |0.4            |Iris-setosa  |
|4.6             |3.4            |1.4             |0.3            |Iris-setosa  |
|5.0             |3.4            |1.5             |0.2            |Iris-setosa  |
|4.4             |2.9            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.1            |1.5             |0.1            |Iris-setosa  |
---------------------------------------------------------------------------------

結果が返ってきました!
これだけでも、データの前処理をSparkライクに行っていけそうな気配がしますね!

次に、モデルを適用するためのUDFを定義していきます。
実はここでn週間くらい使ってしまったのですが、UDFをアップロードするためモデルの読み込みも関数内で行う必要があります。(当然の話でした)

依存関係が解決できずダメなパターン

モデルをローカルで読み込み、UDF内で使用しようとしてしまっていました。

class SerTestFunc extends Serializable {
  val model: Model = new KMeansIrisPMML("KMeansIris.pmml").getModel()

  val irisTransformationFunc = (
    sepal_length: Double,
    sepal_width: Double,
    petal_length: Double,
    petal_width: Double) => {
      val v = Array[Double](sepal_length, sepal_width, petal_length, petal_width)
      model.predict(v).head.asInstanceOf[String]
    }  
}

依存関係が解決でき、成功するパターン

モデルをsessionに追加したjarファイルから読み込むように、UDF内で定義すると上手くいきます。

var resourceName = "/KMeansIris.pmml"

ここではjarファイルの中身を展開した時のPMMLファイルパスを指定するようにします。

class SerTestFunc extends Serializable {
  val irisTransformationFunc = (
    sepal_length: Double,
    sepal_width: Double,
    petal_length: Double,
    petal_width: Double) => {
      import java.io._
      var resourceName = "/KMeansIris.pmml"
      var inputStream = classOf[com.snowflake.snowpark.DataFrame]
        .getResourceAsStream(resourceName)
      val model = Model.fromInputStream(inputStream)
      val v = Array[Double](sepal_length, sepal_width, petal_length, petal_width)
      model.predict(v).head.asInstanceOf[String]
    }  
}

さて、いよいよ実際にモデルを適用してみましょう!
Spark DataFrameと同様に遅延評価されるので

println(df)

のようなことをしても、データフレームの中身が評価され値が表示されることはありません。

object Main {
  def main(args: Array[String]): Unit = {
    val df = getIrisDf(session)

    val testFunc = new SerTestFunc
    val irisTransformationUDF = udf(testFunc.irisTransformationFunc)
  
    val dfFitted = df.withColumn(
      "label",
      irisTransformationUDF(
        col("sepal_length"), col("sepal_width"), col("petal_length"), col("petal_width"))
    )

    println(dfFitted.show(150))
...
-----------------------------------------------------------------------------------------------
|"SEPAL_LENGTH"  |"SEPAL_WIDTH"  |"PETAL_LENGTH"  |"PETAL_WIDTH"  |"CLASS"          |"LABEL"  |
-----------------------------------------------------------------------------------------------
|5.1             |3.5            |1.4             |0.2            |Iris-setosa      |1        |
|4.9             |3.0            |1.4             |0.2            |Iris-setosa      |1        |
|4.7             |3.2            |1.3             |0.2            |Iris-setosa      |1        |
|4.6             |3.1            |1.5             |0.2            |Iris-setosa      |1        |
|5.0             |3.6            |1.4             |0.2            |Iris-setosa      |1        |
|5.4             |3.9            |1.7             |0.4            |Iris-setosa      |1        |
|4.6             |3.4            |1.4             |0.3            |Iris-setosa      |1        |
...

おーーーーーーーー!!
ラベルの整合性や対応番号について確認していないので細かい片手落ち感は否めないですが、UDFを使って推論ができました!!

まだまだ荒削りですが、snowflake.dataframeがspark.dataframeのように扱えるため、推論だけでなくトレーニングもできるようになると、更にSnowflakeの活用の場が広がりそうです!

Snowflakeの導入をご検討のお客様は、下記フォームよりお気軽にお問い合わせください。




DATUM STUDIOは、クライアントの事業成長と経営課題解決を最適な形でサポートする、データ・ビジネスパートナーです。
データ分析の分野でお客様に最適なソリューションをご提供します。まずはご相談ください。

このページをシェアする: