Apache Spark API for Scoring Code¶
The Spark API for Scoring Code is a library that facilitates integrating Scoring Code JARs into Spark clusters.
Availability¶
The following Spark versions support this feature:
- Apache Spark 3.0.0
- Apache Spark 2.4.3
- Apache Spark 1.6.0
To use the library with pre-generated Scoring Code JARs, DataRobot recommends
leveraging the spark-shell --packages
option. For example:
--packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20
You can also add the library to the classpath manually. See examples in the links below:
- https://mvnrepository.com/artifact/com.datarobot/scoring-code-spark-api_3.0.0
- https://mvnrepository.com/artifact/com.datarobot/scoring-code-spark-api_2.4.3
- https://mvnrepository.com/artifact/com.datarobot/scoring-code-spark-api_1.6.0
Using scoring-code-spark-api_3.0.0¶
The workflow for using the Spark version 3.0.0 library is the same as the workflow for scoring-code-spark-api_2.4.3, however you need to use the package com.datarobot.prediction.spark30.*
instead of
com.datarobot.prediction.spark.*
Backward-compatible Java API with scoring-code-spark-api_3.0.0¶
To use scoring-code-spark-api_3.0.0 with the backward-compatible Java API, use the package com.datarobot.prediction.compatible.spark30.*
Using scoring-code-spark-api_2.4.3¶
The following commands outline the workflow for using the Spark version 2.4.3 library.
Scoring with spark-shell
¶
Score the Iris dataset via spark-shell
using a Scoring Code JAR file from a local file system and print the results to your console.
spark-shell --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
import com.datarobot.prediction.spark.{Model, Predictors}
import org.apache.spark.sql.DataFrame
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model: Model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output: DataFrame = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
Loading models in runtime¶
Score the Iris dataset via spark-shell
and print the results to your console using a Scoring Code JAR passed in runtime from:
- a DataRobot instance
- an HDFS
Note
Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>
.
spark-shell --conf "spark.driver.memory=2g" --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
When using a DataRobot instance with a model URL and a user token, use the following command:
import com.datarobot.prediction.spark.{Model, Predictors}
//Given the Iris dataset and a Scoring Code model URL
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint","someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
When using an HDFS, use the following command:
import com.datarobot.prediction.spark.{Model, Predictors}
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
Scoring with pyspark
¶
Score the Iris dataset via pyspark
shell using a Scoring Code JAR file from a local file system and print the results to your console.
pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
from py4j.java_gateway import java_import
# Given an Iris example
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.CodegenModel')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors.getPredictor('5e96ed57a6714a18ad699582')
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show(n=10)
Loading models in runtime¶
Score the Iris dataset via the pyspark
shell and print the results to your console using a Scoring Code JAR passed in runtime from:
- a DataRobot instance
- an HDFS
Note
Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>
.
pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
When using a DataRobot instance with a model URL and a user token, use the following command:
# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint"
,"SomeUserToken")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
When using an HDFS, use the following command:
# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
Time series bulk predictions with spark-shell
¶
Make time series bulk predictions with spark-shell
using a Scoring Code JAR file from the local file system and then print the first 50
results to your console.
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50)
Time series single forecast point predictions with spark-shell
¶
Make time series single forecast point predictions with spark-shell
using a Scoring Code JAR file from the local file system and then print the first 50
results to your console.
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50)
Time series single forecast point streaming predictions with spark-shell
¶
Make time series single forecast point predictions with spark-shell
using a Scoring Code JAR file from the local file system and stream the results to your console.
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
import com.datarobot.prediction.spark.Predictors
import com.datarobot.prediction.{TimeSeriesOptions}
import org.apache.spark.sql.streaming.{OutputMode}
import org.apache.spark.sql.types._
val inFolder = "../testcases/input"// todo you need to write chunks of data in this folder
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("OFF")
val schema = (new StructType()
.add("datetime", "string")
.add("location", "string")
.add("y", "string"))
val df = (spark.readStream
.schema(schema)
.csv(inFolder))
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest()
val model = Predictors.getPredictor(modelId, tsOptions)
val pred = model.transform(df)
(pred.repartition(1).writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination())
Time series bulk predictions with pyspark
¶
Make time series bulk predictions with pyspark
using a Scoring Code JAR file from the local file system and then print the results to your console.
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
Time Series Single Forecast Point Predictions with pyspark
¶
Make time series single forecast point predictions with pyspark
using a Scoring Code JAR file from the local file system and then print the results to your console.
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
.getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()
Using scoring-code-spark-api_1.6.0¶
The following commands outline the workflow for using the Spark version 1.6.0 library.
Scoring with spark-shell
¶
Score the Iris dataset via spark-shell
using a Scoring Code JAR file from a local file system and print the results to your console.
spark-shell --conf "spark.driver.memory=2g" --packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0 --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
Loading models in runtime¶
Score the Iris dataset via spark-shell
and print the results to your console using a Scoring Code JAR passed in runtime from:
- a DataRobot instance
- an HDFS
Note
Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>
.
spark-shell --conf "spark.driver.memory=2g" --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0
When using a DataRobot instance with a model URL and a user token, use the following command:
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
"someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
When using an HDFS, use the following command:
import com.datarobot.prediction.spark16.Predictors
//Given the Iris dataset
val classificationDF = sc.read.format("com.databricks.spark.csv").option("header", "true").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sparkContext, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()
Scoring with pyspark
¶
Score the Iris dataset via pyspark
shell using a Scoring Code JAR file from a local file system and print the results to your console.
To use PySpark 1.6 with external JAR files, you must add them to a Spark driver classpath. For example:
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path ./scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.databricks:spark-csv_2.10:1.4.0,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictor()
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()
Loading models in runtime¶
Score the Iris dataset via pyspark
and print the results to your console using a Scoring Code JAR passed in runtime from:
- a DataRobot instance
- an HDFS
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.databricks:spark-csv_2.10:1.4.0
Note
Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>
.
When using a DataRobot instance with a model URL and a user token, use the following command:
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromServer(
"https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
"someUserToken")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()
When using an HDFS, use the following command:
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sc, "5e96ed57a6714a18ad699582")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()