Skip to content

アプリケーション内で をクリックすると、お使いのDataRobotバージョンに関する全プラットフォームドキュメントにアクセスできます。

スコアリングコード用のApache Spark API

スコアリングコード用のSpark APIは、スコアリングコードJARをSparkクラスターに統合することを容易にするライブラリです。

可用性

以下のSparkバージョンはこの機能をサポートしています。

事前に生成されたスコアリングコードJARとともにライブラリを使用するには、 spark-shell --packagesオプションを活用することが推奨されます。 例:

--packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20

ライブラリを手動でクラスパスに追加することもできます。 以下のリンクの例を参照してください。

scoring-code-spark-api_3.0.0の使用

Sparkバージョン3.0.0ライブラリを使用するためのワークフローは、scoring-code-spark-api_2.4.3のワークフローと同じですが、 com.datarobot.prediction.spark.*ではなくパッケージcom.datarobot.prediction.spark30.*を使用する必要があります

scoring-code-spark-api_3.0.0と下位互換性のあるJava API

下位互換性のあるJava APIでscoring-code-spark-api_3.0.0を使用するには、パッケージcom.datarobot.prediction.compatible.spark30.*を使用します

scoring-code-spark-api_2.4.3の使用

次のコマンドは、Sparkバージョン2.4.3ライブラリを使用するためのワークフローの概要を示します。

spark-shellでのスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用してspark-shell経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

spark-shell --conf "spark.driver.memory=2g"  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19 
import com.datarobot.prediction.spark.{Model, Predictors}
import org.apache.spark.sql.DataFrame
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model: Model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output: DataFrame = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show() 

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用してspark-shell経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19 

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark.{Model, Predictors}
//Given the Iris dataset and a Scoring Code model URL
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint","someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show() 

HDFSを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark.{Model, Predictors}
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction") 

pysparkでのスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用してpysparkシェル経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19 
from py4j.java_gateway import java_import
# Given an Iris example
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.CodegenModel')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors.getPredictor('5e96ed57a6714a18ad699582')
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame

output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show(n=10) 

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用してpysparkシェル経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19 

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint"
                            ,"SomeUserToken")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show() 

HDFSを使用する場合は、次のコマンドを使用します。

# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show() 

Time series bulk predictions with spark-shell

Make time series bulk predictions with spark-shell using a Scoring Code JAR file from the local file system and then print the first 50 results to your console.

 shell title="Start spark-shell"
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g" 
 shell title="Make and print predictions"
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50) 

Time series single forecast point predictions with spark-shell

Make time series single forecast point predictions with spark-shell using a Scoring Code JAR file from the local file system and then print the first 50 results to your console.

 shell title="Start spark-shell"
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g" 
 shell title="Make and print predictions"
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50) 

Time series single forecast point streaming predictions with spark-shell

Make time series single forecast point predictions with spark-shell using a Scoring Code JAR file from the local file system and stream the results to your console.

 shell title="Start spark-shell"
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g" 
 shell title="Make and stream predictions"
import com.datarobot.prediction.spark.Predictors
import com.datarobot.prediction.{TimeSeriesOptions}
import org.apache.spark.sql.streaming.{OutputMode}
import org.apache.spark.sql.types._
val inFolder = "../testcases/input"// todo you need to write chunks of data in this folder
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("OFF")
val schema = (new StructType()
.add("datetime", "string")
.add("location", "string")
.add("y", "string"))
val df = (spark.readStream
.schema(schema)
.csv(inFolder))
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest()
val model = Predictors.getPredictor(modelId, tsOptions)
val pred = model.transform(df)
(pred.repartition(1).writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination()) 

Time series bulk predictions with pyspark

Make time series bulk predictions with pyspark using a Scoring Code JAR file from the local file system and then print the results to your console.

 shell title="Start pyspark"
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g" 
 python title="Make and print predictions"
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show() 

Time Series Single Forecast Point Predictions with pyspark

Make time series single forecast point predictions with pyspark using a Scoring Code JAR file from the local file system and then print the results to your console.

 shell title="Start pyspark"
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g" 
 python title="Make and print predictions"
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show() 

scoring-code-spark-api_1.6.0の使用

次のコマンドは、Sparkバージョン1.6.0ライブラリを使用するためのワークフローの概要を示します。

spark-shellでのスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用してspark-shell経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0 --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar 
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show() 

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用してspark-shell経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0 

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
  "someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show() 

HDFSを使用する場合は、次のコマンドを使用します。

import com.datarobot.prediction.spark16.Predictors
//Given the Iris dataset
val classificationDF = sc.read.format("com.databricks.spark.csv").option("header", "true").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sparkContext, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show() 

pysparkでのスコアリング

ローカルファイルシステムのスコアリングコードJARファイルを使用してpysparkシェル経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

PySpark 1.6を外部JARファイルで使用するには、Sparkドライバーのクラスパスに追加する必要があります。 例:

 wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
 pyspark --conf "spark.driver.memory=2g" --driver-class-path ./scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.databricks:spark-csv_2.10:1.4.0,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20 
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictor()
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show() 

ランタイムでのモデルのロード

ランタイムに以下から渡されたスコアリングコードJARを使用してpyspark経由でIrisデータセットをスコアリングし、結果をコンソールに出力します。

  • DataRobotインスタンス
  • HDFS
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.databricks:spark-csv_2.10:1.4.0 

備考

フィールドcom.datarobot:datarobot-prediction:に適切なスコアリングコードモデルバージョンを提供します。

モデルURLとユーザートークンでDataRobotインスタンスを使用する場合は、次のコマンドを使用します。

# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromServer(
  "https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
  "someUserToken")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show() 

HDFSを使用する場合は、次のコマンドを使用します。

# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sc, "5e96ed57a6714a18ad699582")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show() 

更新しました January 11, 2023
Back to top