スコアリングコード用のApache Spark API¶
スコアリングコード用のSpark APIは、スコアリングコードJARをSparkクラスターに統合することを容易にするライブラリです。
可用性¶
以下のSparkバージョンはこの機能をサポートしています。
- Apache Spark 3.0.0
- Apache Spark 2.4.3
- Apache Spark 1.6.0
事前に生成されたスコアリングコードJARとともにライブラリを使用するには、
spark-shell --packages
オプションを活用することが推奨されます。 例:
--packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20
ライブラリを手動でクラスパスに追加することもできます。 以下のリンクの例を参照してください。
- https://mvnrepository.com/artifact/com.datarobot/scoring-code-spark-api_3.0.0
- https://mvnrepository.com/artifact/com.datarobot/scoring-code-spark-api_2.4.3
- https://mvnrepository.com/artifact/com.datarobot/scoring-code-spark-api_1.6.0
scoring-code-spark-api_3.0.0の使用¶
Sparkバージョン3.0.0ライブラリを使用するためのワークフローは、scoring-code-spark-api_2.4.3のワークフローと同じですが、
com.datarobot.prediction.spark.*
ではなくパッケージcom.datarobot.prediction.spark30.*
を使用する必要があります
scoring-code-spark-api_3.0.0と下位互換性のあるJava API¶
下位互換性のあるJava APIでscoring-code-spark-api_3.0.0を使用するには、パッケージcom.datarobot.prediction.compatible.spark30.*
を使用します
scoring-code-spark-api_2.4.3の使用¶
次のコマンドは、Sparkバージョン2.4.3ライブラリを使用するためのワークフローの概要を示します。
spark-shell
でのスコアリング¶
ローカルファイルシステムのスコアリングコードJARファイルを使用してspark-shell
経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
spark-shell --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
import com.datarobot.prediction.spark.{Model, Predictors}
import org.apache.spark.sql.DataFrame
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model: Model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output: DataFrame = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
ランタイムでのモデルのロード¶
ランタイムに以下から渡されたスコアリングコードJARを使用してspark-shell
経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
- DataRobotインスタンス
- HDFS
備考
フィールドcom.datarobot:datarobot-prediction:
に適切なスコアリングコードモデルバージョンを提供します。
spark-shell --conf "spark.driver.memory=2g" --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。
import com.datarobot.prediction.spark.{Model, Predictors}
//Given the Iris dataset and a Scoring Code model URL
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint","someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
HDFSを使用する場合は、次のコマンドを使用します。
import com.datarobot.prediction.spark.{Model, Predictors}
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
pyspark
でのスコアリング¶
ローカルファイルシステムのスコアリングコードJARファイルを使用してpyspark
シェル経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
from py4j.java_gateway import java_import
# Given an Iris example
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.CodegenModel')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors.getPredictor('5e96ed57a6714a18ad699582')
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show(n=10)
ランタイムでのモデルのロード¶
ランタイムに以下から渡されたスコアリングコードJARを使用してpyspark
シェル経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
- DataRobotインスタンス
- HDFS
備考
フィールドcom.datarobot:datarobot-prediction:
に適切なスコアリングコードモデルバージョンを提供します。
pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。
# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint"
,"SomeUserToken")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
HDFSを使用する場合は、次のコマンドを使用します。
# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
Time series bulk predictions with spark-shell
¶
Make time series bulk predictions with spark-shell
using a Scoring Code JAR file from the local file system and then print the first 50
results to your console.
shell title="Start spark-shell"
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
shell title="Make and print predictions"
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50)
Time series single forecast point predictions with spark-shell
¶
Make time series single forecast point predictions with spark-shell
using a Scoring Code JAR file from the local file system and then print the first 50
results to your console.
shell title="Start spark-shell"
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
shell title="Make and print predictions"
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50)
Time series single forecast point streaming predictions with spark-shell
¶
Make time series single forecast point predictions with spark-shell
using a Scoring Code JAR file from the local file system and stream the results to your console.
shell title="Start spark-shell"
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
shell title="Make and stream predictions"
import com.datarobot.prediction.spark.Predictors
import com.datarobot.prediction.{TimeSeriesOptions}
import org.apache.spark.sql.streaming.{OutputMode}
import org.apache.spark.sql.types._
val inFolder = "../testcases/input"// todo you need to write chunks of data in this folder
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("OFF")
val schema = (new StructType()
.add("datetime", "string")
.add("location", "string")
.add("y", "string"))
val df = (spark.readStream
.schema(schema)
.csv(inFolder))
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest()
val model = Predictors.getPredictor(modelId, tsOptions)
val pred = model.transform(df)
(pred.repartition(1).writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination())
Time series bulk predictions with pyspark
¶
Make time series bulk predictions with pyspark
using a Scoring Code JAR file from the local file system and then print the results to your console.
shell title="Start pyspark"
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
python title="Make and print predictions"
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
Time Series Single Forecast Point Predictions with pyspark
¶
Make time series single forecast point predictions with pyspark
using a Scoring Code JAR file from the local file system and then print the results to your console.
shell title="Start pyspark"
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
python title="Make and print predictions"
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
scoring-code-spark-api_1.6.0の使用¶
次のコマンドは、Sparkバージョン1.6.0ライブラリを使用するためのワークフローの概要を示します。
spark-shell
でのスコアリング¶
ローカルファイルシステムのスコアリングコードJARファイルを使用してspark-shell
経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
spark-shell --conf "spark.driver.memory=2g" --packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0 --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
ランタイムでのモデルのロード¶
ランタイムに以下から渡されたスコアリングコードJARを使用してspark-shell
経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
- DataRobotインスタンス
- HDFS
備考
フィールドcom.datarobot:datarobot-prediction:
に適切なスコアリングコードモデルバージョンを提供します。
spark-shell --conf "spark.driver.memory=2g" --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0
モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
"someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
HDFSを使用する場合は、次のコマンドを使用します。
import com.datarobot.prediction.spark16.Predictors
//Given the Iris dataset
val classificationDF = sc.read.format("com.databricks.spark.csv").option("header", "true").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sparkContext, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
pyspark
でのスコアリング¶
ローカルファイルシステムのスコアリングコードJARファイルを使用してpyspark
シェル経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
PySpark 1.6を外部JARファイルで使用するには、Sparkドライバーのクラスパスに追加する必要があります。 例:
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path ./scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.databricks:spark-csv_2.10:1.4.0,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictor()
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()
ランタイムでのモデルのロード¶
ランタイムに以下から渡されたスコアリングコードJARを使用してpyspark
経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。
- DataRobotインスタンス
- HDFS
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.databricks:spark-csv_2.10:1.4.0
備考
フィールドcom.datarobot:datarobot-prediction:
に適切なスコアリングコードモデルバージョンを提供します。
モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromServer(
"https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
"someUserToken")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()
HDFSを使用する場合は、次のコマンドを使用します。
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sc, "5e96ed57a6714a18ad699582")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()