Skip to content

Click in-app to access the full platform documentation for your version of DataRobot.

Apache Spark API for Scoring Code

The Spark API for Scoring Code is a library that facilitates integrating Scoring Code JARs into Spark clusters.

Availability

The following Spark versions support this feature:

To use the library with pre-generated Scoring Code JARs, DataRobot recommends leveraging the spark-shell --packages option. For example:

--packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20

You can also add the library to the classpath manually. See examples in the links below:

Using scoring-code-spark-api_3.0.0

The workflow for using the Spark version 3.0.0 library is the same as the workflow for scoring-code-spark-api_2.4.3, however you need to use the package com.datarobot.prediction.spark30.* instead of com.datarobot.prediction.spark.*

Backward-compatible Java API with scoring-code-spark-api_3.0.0

To use scoring-code-spark-api_3.0.0 with the backward-compatible Java API, use the package com.datarobot.prediction.compatible.spark30.*

Using scoring-code-spark-api_2.4.3

The following commands outline the workflow for using the Spark version 2.4.3 library.

Scoring with spark-shell

Score the Iris dataset via spark-shell using a Scoring Code JAR file from a local file system and print the results to your console.

spark-shell --conf "spark.driver.memory=2g"  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
import com.datarobot.prediction.spark.{Model, Predictors}
import org.apache.spark.sql.DataFrame
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model: Model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output: DataFrame = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

Loading models in runtime

Score the Iris dataset via spark-shell and print the results to your console using a Scoring Code JAR passed in runtime from:

  • a DataRobot instance
  • an HDFS

Note

Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>.

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19

When using a DataRobot instance with a model URL and a user token, use the following command:

import com.datarobot.prediction.spark.{Model, Predictors}
//Given the Iris dataset and a Scoring Code model URL
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint","someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

When using an HDFS, use the following command:

import com.datarobot.prediction.spark.{Model, Predictors}
//Given an Iris dataset
val classificationDF = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")

Scoring with pyspark

Score the Iris dataset via pyspark shell using a Scoring Code JAR file from a local file system and print the results to your console.

pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:scoring-code-spark-api_2.4.3:0.0.19
from py4j.java_gateway import java_import
# Given an Iris example
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.CodegenModel')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors.getPredictor('5e96ed57a6714a18ad699582')
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame

output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show(n=10)

Loading models in runtime

Score the Iris dataset via the pyspark shell and print the results to your console using a Scoring Code JAR passed in runtime from:

  • a DataRobot instance
  • an HDFS

Note

Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>.

pyspark --conf "spark.driver.memory=2g" --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_2.4.3:0.0.19

When using a DataRobot instance with a model URL and a user token, use the following command:

# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint"
                            ,"SomeUserToken")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()

When using an HDFS, use the following command:

# Given an Iris example
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/iris/Iris.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", spark, "5e96ed57a6714a18ad699582")
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()

Time series bulk predictions with spark-shell

Make time series bulk predictions with spark-shell using a Scoring Code JAR file from the local file system and then print the first 50 results to your console.

Start spark-shell
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
Make and print predictions
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50)

Time series single forecast point predictions with spark-shell

Make time series single forecast point predictions with spark-shell using a Scoring Code JAR file from the local file system and then print the first 50 results to your console.

Start spark-shell
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
Make and print predictions
import com.datarobot.prediction.TimeSeriesOptions
import com.datarobot.prediction.spark.Predictors
val predictionDatasetPath = "../testcases/syph_sorted.csv"
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("ERROR")
val inputDf = spark.read.format("com.databricks.spark.csv").option("header", "true").option("quote", "\"").option("escape", "\"").load(predictionDatasetPath)
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
val model = Predictors.getPredictor(modelId, tsOptions)
val inputFields = inputDf.schema.map(item => item.name).toSet[String]
val df = model.transform(inputDf).drop(inputFields.toArray: _*)
df.show(50)

Time series single forecast point streaming predictions with spark-shell

Make time series single forecast point predictions with spark-shell using a Scoring Code JAR file from the local file system and stream the results to your console.

Start spark-shell
spark-shell --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
Make and stream predictions
import com.datarobot.prediction.spark.Predictors
import com.datarobot.prediction.{TimeSeriesOptions}
import org.apache.spark.sql.streaming.{OutputMode}
import org.apache.spark.sql.types._
val inFolder = "../testcases/input"// todo you need to write chunks of data in this folder
val modelId = "62fa1f42d9d9741a5b620ab2"
spark.sparkContext.setLogLevel("OFF")
val schema = (new StructType()
.add("datetime", "string")
.add("location", "string")
.add("y", "string"))
val df = (spark.readStream
.schema(schema)
.csv(inFolder))
val tsOptions = new TimeSeriesOptions.Builder().buildSingleForecastPointRequest()
val model = Predictors.getPredictor(modelId, tsOptions)
val pred = model.transform(df)
(pred.repartition(1).writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination())

Time series bulk predictions with pyspark

Make time series bulk predictions with pyspark using a Scoring Code JAR file from the local file system and then print the results to your console.

Start pyspark
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
Make and print predictions
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildForecastDateRangeRequest("2010-12-05", "2011-01-02")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()

Time Series Single Forecast Point Predictions with pyspark

Make time series single forecast point predictions with pyspark using a Scoring Code JAR file from the local file system and then print the results to your console.

Start pyspark
pyspark --jars 62fa1f42d9d9741a5b620ab2.jar  --packages com.datarobot:datarobot-prediction:2.2.1 --deploy-mode client --conf "spark.driver.memory=2g"
Make and print predictions
from py4j.java_gateway import java_import
inputData = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load("../testcases/syph_sorted.csv")
print("Before prediction")
inputData.show(n=10)
# When invoke the transform method on it
java_import(spark._jvm, 'com.datarobot.prediction.Predictors')
java_import(spark._jvm, 'com.datarobot.prediction.TimeSeriesOptions')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Model')
java_import(spark._jvm, 'com.datarobot.prediction.spark.Predictors')
ts_options = spark._jvm.com.datarobot.prediction.TimeSeriesOptions.Builder().buildSingleForecastPointRequest("2010-12-05")
codeGenModel = spark._jvm.com.datarobot.prediction.spark.Predictors\
    .getPredictor("62fa1f42d9d9741a5b620ab2", ts_options)
java_dataframe_object = codeGenModel.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
from pyspark.sql import DataFrame
output = DataFrame(java_dataframe_object, spark)
print("After prediction")
output.show()

Using scoring-code-spark-api_1.6.0

The following commands outline the workflow for using the Spark version 1.6.0 library.

Scoring with spark-shell

Score the Iris dataset via spark-shell using a Scoring Code JAR file from a local file system and print the results to your console.

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0 --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar
import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictor("5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

Loading models in runtime

Score the Iris dataset via spark-shell and print the results to your console using a Scoring Code JAR passed in runtime from:

  • a DataRobot instance
  • an HDFS

Note

Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>.

spark-shell --conf "spark.driver.memory=2g"  --packages com.datarobot:datarobot-prediction:2.1.4,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20,com.databricks:spark-csv_2.10:1.4.0

When using a DataRobot instance with a model URL and a user token, use the following command:

import com.datarobot.prediction.spark16.Predictors
val classificationDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
val model = Predictors.getPredictorFromServer("https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
  "someUserToken")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

When using an HDFS, use the following command:

import com.datarobot.prediction.spark16.Predictors
//Given the Iris dataset
val classificationDF = sc.read.format("com.databricks.spark.csv").option("header", "true").load("testcases/iris/Iris.csv")
println("Before prediction")
classificationDF.show()
//When execute prediction using the CodeGen model
val model = Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sparkContext, "5e96ed57a6714a18ad699582")
val output = model.transform(classificationDF)
//Then I will get the initial dataframe with additional columns with the prediction results
println("After prediction")
output.show()

Scoring with pyspark

Score the Iris dataset via pyspark shell using a Scoring Code JAR file from a local file system and print the results to your console.

To use PySpark 1.6 with external JAR files, you must add them to a Spark driver classpath. For example:

wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path ./scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.databricks:spark-csv_2.10:1.4.0,com.datarobot:scoring-code-spark-api_1.6.0:0.0.20
# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictor()
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()

Loading models in runtime

Score the Iris dataset via pyspark and print the results to your console using a Scoring Code JAR passed in runtime from:

  • a DataRobot instance
  • an HDFS
wget https://repo1.maven.org/maven2/com/datarobot/scoring-code-spark-api_1.6.0/0.0.20/scoring-code-spark-api_1.6.0-0.0.20.jar
pyspark --conf "spark.driver.memory=2g" --driver-class-path scoring-code-spark-api_1.6.0-0.0.20.jar:../testcases/iris/5e96ed57a6714a18ad699582.jar  --jars ../testcases/iris/5e96ed57a6714a18ad699582.jar --packages com.datarobot:datarobot-prediction:2.1.4,com.databricks:spark-csv_2.10:1.4.0

Note

Provide the appropriate Scoring Code model version for the field com.datarobot:datarobot-prediction:<VERSION>.

When using a DataRobot instance with a model URL and a user token, use the following command:

# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('../testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromServer(
  "https://app.datarobot.com/projects/5e96ecf8617f2318c8a18149/models/5e96ed57a6714a18ad699582/blueprint",
  "someUserToken")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()

When using an HDFS, use the following command:

# Given an Iris dataframe and a Scoring Code model for it
from py4j.java_gateway import java_import
from pyspark.sql import DataFrame
inputData = sqlContext.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load('testcases/iris/Iris.csv')
inputData.show(20)
# When invoke get a Scoring Code model via Spark API for Scoring Code and run transform method
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Predictors')
java_import(sc._jvm, 'com.datarobot.prediction.spark16.Model')
model = sc._jvm.com.datarobot.prediction.spark16.Predictors.getPredictorFromHdfs("testcases/iris/5e96ed57a6714a18ad699582.jar", sc, "5e96ed57a6714a18ad699582")
java_dataframe = model.transform(inputData._jdf)
# Then I will get the initial dataframe with additional columns with the prediction results
output = DataFrame(java_dataframe, sqlContext)
output.show()

Updated December 28, 2022
Back to top