Batch prediction use cases¶
The following provides several end-to-end examples of scoring with API code for both CSV files and external services.
- End-to-end scoring of CSV files from local files
- End-to-end scoring of CSV files on S3
- AI Catalog-to-CSV file scoring
- End-to-end scoring from a JDBC PostgreSQL database
- End-to-end scoring with Snowflake
- End-to-end scoring with Synapse
- End-to-end scoring with BigQuery
Note
These use cases require the DataRobot API client to be installed.
End-to-end scoring of CSV files from local files¶
The following example scores a local CSV file, waits for processing to start, and then initializes the download.
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "..."
input_file = "to_predict.csv"
output_file = "predicted.csv"
job = dr.BatchPredictionJob.score_to_file(
deployment_id,
input_file,
output_file,
passthrough_columns_set="all"
)
print("started scoring...", job)
job.wait_for_completion()
Prediction Explanations¶
You can include Prediction Explanations by adding the desired Prediction Explanation parameters to the job configuration:
job = dr.BatchPredictionJob.score_to_file(
deployment_id,
input_file,
output_file,
max_explanations=10,
threshold_high=0.5,
threshold_low=0.15,
)
Custom CSV format¶
If your CSV files does not match the default CSV format, you can modify the expected CSV format by setting csvSettings
:
job = dr.BatchPredictionJob.score_to_file(
deployment_id,
input_file,
output_file,
csv_settings={
'delimiter': ';',
'quotechar': '\'',
'encoding': 'ms_kanji',
},
)
End-to-end scoring of CSV files on S3¶
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "616d01a8ddbd17fc2c75caf4"
credential_id = "..."
s3_csv_input_file = 's3://my-bucket/data/to_predict.csv'
s3_csv_output_file = 's3://my-bucket/data/predicted.csv'
job = dr.BatchPredictionJob.score_s3(
deployment_id,
source_url=s3_csv_input_file,
destination_url=s3_csv_output_file,
credential=credential_id
)
print("started scoring...", job)
job.wait_for_completion()
The same functionality is available for score_azure
and score_gcp
. You can also specify the credential
object itself, instead of a credential ID:
credentials = dr.Credential.get(credential_id)
job = dr.BatchPredictionJob.score_s3(
deployment_id,
source_url=s3_csv_input_file,
destination_url=s3_csv_output_file,
credential=credentials,
)
Prediction Explanations¶
You can include Prediction Explanations by adding the desired Prediction Explanation parameters to the job configuration:
job = dr.BatchPredictionJob.score_s3(
deployment_id,
source_url=s3_csv_input_file,
destination_url=s3_csv_output_file,
credential=credential_id,
max_explanations=10,
threshold_high=0.5,
threshold_low=0.15,
)
AI Catalog-to-CSV file scoring¶
When using the AI Catalog for intake, you need the dataset_id
of an already created dataset.
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "616d01a8ddbd17fc2c75caf4"
credential_id = "..."
dataset_id = "..."
dataset = dr.Dataset.get(dataset_id)
job = dr.BatchPredictionJob.score(
deployment_id,
intake_settings={
'type': 'dataset',
'dataset_id': dataset,
},
output_settings={
'type': 'localFile',
},
)
job.wait_for_completion()
End-to-end scoring from a JDBC PostgreSQL database¶
The following reads a scoring dataset from the table public.scoring_data
and saves the scored data back to public.scored_data
(assuming that table already exists).
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "616d01a8ddbd17fc2c75caf4"
credential_id = "..."
datastore_id = "..."
intake_settings = {
'type': 'jdbc',
'table': 'scoring_data',
'schema': 'public',
'data_store_id': datastore_id,
'credential_id': credential_id,
}
output_settings = {
'type': 'jdbc',
'table': 'scored_data',
'schema': 'public',
'data_store_id': datastore_id,
'credential_id': credential_id,
'statement_type': 'insert'
}
job = dr.BatchPredictionJob.score(
deployment_id,
passthrough_columns_set='all',
intake_settings=intake_settings,
output_settings=output_settings,
)
print("started scoring...", job)
job.wait_for_completion()
More details about JDBC scoring can be found here.
End-to-end scoring with Snowflake¶
The following example reads a scoring dataset from the table public.SCORING_DATA
and saves the scored data back to public.SCORED_DATA
(assuming that table already exists).
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "616d01a8ddbd17fc2c75caf4"
credential_id = "..."
cloud_storage_credential_id = "..."
datastore_id = "..."
intake_settings = {
'type': 'snowflake',
'table': 'SCORING_DATA',
'schema': 'PUBLIC',
'external_stage': 'my_s3_stage_in_snowflake',
'data_store_id': datastore_id,
'credential_id': credential_id,
'cloud_storage_type': 's3',
'cloud_storage_credential_id': cloud_storage_credential_id
}
output_settings = {
'type': 'snowflake',
'table': 'SCORED_DATA',
'schema': 'PUBLIC',
'statement_type': 'insert'
'external_stage': 'my_s3_stage_in_snowflake',
'data_store_id': datastore_id,
'credential_id': credential_id,
'cloud_storage_type': 's3',
'cloud_storage_credential_id': cloud_storage_credential_id
}
job = dr.BatchPredictionJob.score(
deployment_id,
passthrough_columns_set='all',
intake_settings=intake_settings,
output_settings=output_settings,
)
print("started scoring...", job)
job.wait_for_completion()
More details about Snowflake scoring can be found in intake and output documentation.
End-to-end scoring with Synapse¶
The following example reads a scoring dataset from the table public.scoring_data
and saves the scored data back to public.scored_data
(assuming that table already exists).
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "616d01a8ddbd17fc2c75caf4"
credential_id = "..."
cloud_storage_credential_id = "..."
datastore_id = "..."
intake_settings = {
'type': 'synapse',
'table': 'SCORING_DATA',
'schema': 'PUBLIC',
'external_data_source': 'some_datastore',
'data_store_id': datastore_id,
'credential_id': credential_id,
'cloud_storage_credential_id': cloud_storage_credential_id
}
output_settings = {
'type': 'synapse',
'table': 'SCORED_DATA',
'schema': 'PUBLIC',
'statement_type': 'insert'
'external_data_source': 'some_datastore',
'data_store_id': datastore_id,
'credential_id': credential_id,
'cloud_storage_credential_id': cloud_storage_credential_id
}
job = dr.BatchPredictionJob.score(
deployment_id,
passthrough_columns_set='all',
intake_settings=intake_settings,
output_settings=output_settings,
)
print("started scoring...", job)
job.wait_for_completion()
More details about Synapse scoring can be found in the intake and output documentation.
End-to-end scoring with BigQuery¶
The following example scores data from a BigQuery table and sends results to a BigQuery table.
import datarobot as dr
dr.Client(
endpoint="https://app.datarobot.com/api/v2",
token="...",
)
deployment_id = "616d01a8ddbd17fc2c75caf4"
gcs_credential_id = "6166c01ee91fb6641ecd28bd"
intake_settings = {
'type': 'bigquery',
'dataset': 'my-dataset',
'table': 'intake-table',
'bucket': 'my-bucket',
'credential_id': gcs_credential_id,
}
output_settings = {
'type': 'bigquery',
'dataset': 'my-dataset',
'table': 'output-table',
'bucket': 'my-bucket',
'credential_id': gcs_credential_id,
}
job = dr.BatchPredictionJob.score(
deployment=deployment_id,
intake_settings=intake_settings,
output_settings=output_settings,
include_prediction_status=True,
passthrough_columns=["some_col_name"],
)
print("started scoring...", job)
job.wait_for_completion()
More details about BigQuery scoring can be found in the intake and output documentation.