Skip to content

アプリケーション内で をクリックすると、お使いのDataRobotバージョンに関する全プラットフォームドキュメントにアクセスできます。

スコアリングコード用のApache Spark API

Spark API for Scoring Codeは、スコアリングコードJARをSparkクラスターに統合することを容易にするライブラリです。

可用性

以下のSparkバージョンはこの機能をサポートしています。

事前に生成されたスコアリングコードJARとともにライブラリを使用するには、spark-shell --packagesオプションを利用します。例:

--packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20

ライブラリを手動でクラスパスに追加することもできます。以下のリンクの例を参照してください。

scoring-code-spark-api_3.0.0の使用

Sparkバージョン3.0.0ライブラリを使用するためのワークフローは、scoring-code-spark-api_2.4.3のワークフローと同じですが、com.datarobot.prediction.spark.*ではなくパッケージcom.datarobot.prediction.spark30.*を使用する必要があります。

scoring-code-spark-api_3.0.0 と下位互換性のあるJava APIです。

下位互換性のあるJava APIでscoring-code-spark-api_3.0.0を使用するには、パッケージを使用します。com.datarobot.prediction.compatible.spark30.*

scoring-code-spark-api_2.4.3の使用

次のコマンドは、Sparkバージョン2.4.3ライブラリを使用するためのワークフローの概要を示します。

スパークシェルでスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用して経由でIrisデータセットをスコアリングし、spark-shell結果をコンソールに出力します。

spark-shell --conf "spark.driver.memory=2g"  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
import com.datarobot.prediction.spark.{Model, Predictors}
import org.apache.spark.sql.DataFrame
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model: Model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output: DataFrame = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用して経由でIrisデータセットをスコアリングし、spark-shell結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark.{Model, Predictors}
//Given the Iris dataset and a Scoring Code model URL
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint","someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

HDFSを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark.{Model, Predictors}
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")

pysparkでスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用してシェル経由でIrisデータセットをスコアリングし、pyspark結果をコンソールに出力します。

pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
from py4j.java_gateway import java_import
# Given an Iris example
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.CodegenModel')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors.getPredictor('5e96ed57a6714a18ad699582')
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame

output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show(n=10)

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用してシェル経由でIrisデータセットをスコアリングし、pyspark結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint"
                            ,"SomeUserToken")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()

HDFSを使用する場合は、次のコマンドを使用します。

# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()

scoring-code-spark-api_1.6.0を使用する

次のコマンドは、Sparkバージョン1.6.0ライブラリを使用するためのワークフローの概要を示します。

スパークシェルでスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用して経由でIrisデータセットをスコアリングし、spark-shell結果をコンソールに出力します。

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0 --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用して経由でIrisデータセットをスコアリングし、spark-shell結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
  "someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

HDFSを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark16.Predictors
//Given the Iris dataset
val classificationDF = sc.read.format("com.databricks.spark.csv").option("header", "true").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sparkContext, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

pysparkでスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用してシェル経由でIrisデータセットをスコアリングし、pyspark結果をコンソールに出力します。

PySpark 1.6を外部JARファイルで使用するには、Sparkドライバーのクラスパスに追加する必要があります。例:

wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path ./scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.databricks:spark-csv_2.10:1.4.0,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictor()
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用して経由でIrisデータセットをスコアリングし、pyspark結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.databricks:spark-csv_2.10:1.4.0

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromServer(
  "https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
  "someUserToken")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()

HDFSを使用する場合は、次のコマンドを使用します。

# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sc, "5e96ed57a6714a18ad699582")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()

更新しました February 22, 2022
Back to top