Chunking Service dynamic dataset example¶
Requirements: DataRobot Python SDK, credentials, named credential for the dataset, and the SAMPLE_DATA_TO_START_PROJECT feature flag enabled.
Sample helpers use module-level DR_API_TOKEN and DR_ENDPOINT (set in the credentials cell).
Authentication¶
In [ ]:
Copied!
import os
DR_API_TOKEN = os.environ.get("DR_API_TOKEN", "")
DR_ENDPOINT = os.environ.get("DR_ENDPOINT", "")
if not DR_API_TOKEN or not DR_ENDPOINT:
raise ValueError("Set DR_API_TOKEN and DR_ENDPOINT in the environment or edit this cell.")
import os
DR_API_TOKEN = os.environ.get("DR_API_TOKEN", "")
DR_ENDPOINT = os.environ.get("DR_ENDPOINT", "")
if not DR_API_TOKEN or not DR_ENDPOINT:
raise ValueError("Set DR_API_TOKEN and DR_ENDPOINT in the environment or edit this cell.")
Import libraries¶
In [ ]:
Copied!
import math
import time
from typing import Dict, List, Optional
import datarobot as dr
import pandas as pd
from datarobot import UseCase
from datarobot.enums import ChunkingPartitionMethod, ChunkingStrategy
from datarobot.models.chunking_service_v2 import ChunkDefinition, DatasetDefinition
from datarobot.models.project import Project
from datarobot.utils.waiters import wait_for_async_resolution
SAMPLE_SIZE = 500 * 1024**2 # bytes (500 MB)
_ = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
import math
import time
from typing import Dict, List, Optional
import datarobot as dr
import pandas as pd
from datarobot import UseCase
from datarobot.enums import ChunkingPartitionMethod, ChunkingStrategy
from datarobot.models.chunking_service_v2 import ChunkDefinition, DatasetDefinition
from datarobot.models.project import Project
from datarobot.utils.waiters import wait_for_async_resolution
SAMPLE_SIZE = 500 * 1024**2 # bytes (500 MB)
_ = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
Configure pipeline functions¶
In [ ]:
Copied!
def get_credential_id(credential_name: str) -> str:
credentials = [cr for cr in dr.Credential.list() if cr.name == credential_name]
if not credentials:
raise ValueError(f"No credential found with name: {credential_name}")
print(f"Credential found: {credential_name}")
return str(credentials[0].credential_id)
def add_project_to_use_case(use_case_id: str, project_id: str) -> None:
project = dr.Project.get(project_id)
use_case = UseCase.get(use_case_id=use_case_id)
use_case.add(entity=project)
def create_dataset_definition(
dataset_id: str,
dataset_version_id: Optional[str],
name: str,
credential_name: str,
) -> DatasetDefinition:
credential_id = get_credential_id(credential_name)
dataset_definition = DatasetDefinition.create(
dataset_id, dataset_version_id, name=name, credentials_id=credential_id
)
print(f"Created dataset definition: {dataset_definition.id}")
DatasetDefinition.analyze(dataset_definition.id)
dataset_definition = DatasetDefinition.get(dataset_definition.id)
dataset_info = dataset_definition.dataset_info
if dataset_info is not None:
print(
f"Dataset info - Total rows: {dataset_info.total_rows}, "
f"Estimated size per row: {dataset_info.estimated_size_per_row}"
)
return dataset_definition
def create_sample_row_based(dataset_definition: DatasetDefinition) -> str:
client = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
dataset_info = dataset_definition.dataset_info
if dataset_info is None:
raise ValueError("Dataset definition has no dataset_info; ensure it has been analyzed")
estimated_size_per_row = dataset_info.estimated_size_per_row
dataset_props = dataset_definition.dataset_props
sample_size_rows = math.ceil(SAMPLE_SIZE / estimated_size_per_row)
print(f"Row count for {SAMPLE_SIZE / (1024**2):.1f}MB: {sample_size_rows}")
sample_rows_payload = {
"samplingStrategy": {
"directive": "efficient-rowbased-sample",
"arguments": {"size": sample_size_rows, "samplingMethod": "rows"},
}
}
response = client.post(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/samples",
json=sample_rows_payload,
)
sample_response = response.json()
row_based_sample_version_id = sample_response["catalogVersionId"]
wait_for_async_resolution(client, response.headers["Location"])
response = client.get(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/versions/?category=SAMPLE"
)
samples = response.json()
if samples["totalCount"] > 0:
sample_df = pd.DataFrame(samples["data"])
mask = sample_df["categories"].apply(lambda x: "SAMPLE" in x)
sample_df = sample_df[mask]
assert row_based_sample_version_id in sample_df["versionId"].tolist(), (
f"Row based sample version {row_based_sample_version_id} not found in samples"
)
else:
raise ValueError(f"No sample version found for dataset {dataset_props.dataset_id}")
return str(row_based_sample_version_id)
def create_sample_percentage_based(dataset_definition: DatasetDefinition) -> str:
client = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
dataset_info = dataset_definition.dataset_info
if dataset_info is None:
raise ValueError("Dataset definition has no dataset_info; ensure it has been analyzed")
source_size = dataset_info.source_size
dataset_props = dataset_definition.dataset_props
sample_size_percentage = (SAMPLE_SIZE / source_size) * 100
print(f"Percentage of dataset size for {SAMPLE_SIZE / (1024**2):.1f}MB: {sample_size_percentage:.2f}%")
sample_percentage_payload = {
"samplingStrategy": {
"directive": "efficient-rowbased-sample",
"arguments": {"size": sample_size_percentage, "samplingMethod": "percent"},
}
}
response = client.post(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/samples",
json=sample_percentage_payload,
)
sample_response = response.json()
percentage_sample_version_id = sample_response["catalogVersionId"]
wait_for_async_resolution(client, response.headers["Location"])
response = client.get(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/versions/?category=SAMPLE"
)
samples = response.json()
if samples["totalCount"] > 0:
sample_df = pd.DataFrame(samples["data"])
mask = sample_df["categories"].apply(lambda x: "SAMPLE" in x)
sample_df = sample_df[mask]
assert percentage_sample_version_id in sample_df["versionId"].tolist(), (
f"Percentage sample version {percentage_sample_version_id} not found in samples"
)
else:
raise ValueError(f"No sample version found for dataset {dataset_props.dataset_id}")
return str(percentage_sample_version_id)
def create_project(
dataset_id: str,
target_column: str,
chunk_definition: ChunkDefinition,
use_case_id: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
) -> Project:
project: Project = dr.Project.create_from_dataset(
dataset_id,
project_name="Dynamic Dataset Project with Incremental Learning OTV",
use_sample_from_dataset=True,
max_wait=6000,
)
project_partitioning_method = None
if datetime_partition_column is not None:
print(f"Setting up datetime partitioning for project {project.id}")
spec = dr.DatetimePartitioningSpecification(
datetime_partition_column=datetime_partition_column,
use_time_series=False,
)
full_part = dr.DatetimePartitioning.generate_optimized(project.id, spec, target_column)
project_partitioning_method = dr.helpers.partitioning_methods.DatetimePartitioningId(
full_part.datetime_partitioning_id, project.id
)
print(f"Datetime partitioning set for project {project.id}")
else:
print(f"No datetime partitioning; proceeding with defaults for project {project.id}")
if use_case_id:
add_project_to_use_case(use_case_id=use_case_id, project_id=project.id)
advanced_options = dr.helpers.AdvancedOptions(
incremental_learning_only_mode=True,
incremental_learning_on_best_model=True,
chunk_definition_id=chunk_definition.id,
incremental_learning_early_stopping_rounds=0,
)
project.analyze_and_model(
target=target_column,
mode=dr.enums.AUTOPILOT_MODE.QUICK,
partitioning_method=project_partitioning_method,
advanced_options=advanced_options,
worker_count=-1,
max_wait=6000,
)
return project
def create_chunk_definition(
dataset_definition: DatasetDefinition,
sorting_columns: List[str],
target_column: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
target_class: Optional[str] = None,
chunking_partition_method: Optional[ChunkingPartitionMethod] = ChunkingPartitionMethod.RANDOM,
) -> ChunkDefinition:
partition_args_dict: Dict[str, Optional[str]]
if chunking_partition_method == ChunkingPartitionMethod.RANDOM:
partition_args_dict = {
"target_column": None,
"target_class": None,
"datetime_partition_column": None,
}
elif chunking_partition_method == ChunkingPartitionMethod.STRATIFIED:
partition_args_dict = {
"target_column": target_column,
"target_class": target_class,
"datetime_partition_column": None,
}
else:
partition_args_dict = {
"target_column": None,
"target_class": None,
"datetime_partition_column": datetime_partition_column,
}
chunk_definition = ChunkDefinition.create(
dataset_definition.id,
partition_method=chunking_partition_method,
chunking_strategy_type=ChunkingStrategy.ROWS,
target_column=partition_args_dict["target_column"],
target_class=partition_args_dict["target_class"],
datetime_partition_column=partition_args_dict["datetime_partition_column"],
order_by_columns=sorting_columns,
)
print(f"Created chunk definition: {chunk_definition.id}")
ChunkDefinition.analyze(dataset_definition.id, chunk_definition.id)
chunk_definition = ChunkDefinition.get(dataset_definition.id, chunk_definition.id)
stats = chunk_definition.chunk_definition_stats
if stats is not None:
print(f"Chunk definition stats - Number of chunks: {stats.total_number_of_chunks}")
return chunk_definition
def run_dynamic_dataset_pipeline(
dataset_id: str,
target_column: str,
sorting_columns: List[str],
credential_name: str,
target_class: Optional[str] = None,
chunking_partition_method: Optional[ChunkingPartitionMethod] = ChunkingPartitionMethod.RANDOM,
dataset_version_id: Optional[str] = None,
use_case_id: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
) -> Project:
start_time = time.time()
print("Starting dynamic dataset pipeline...")
dataset_definition = create_dataset_definition(
dataset_id, dataset_version_id, name="Dynamic Dataset", credential_name=credential_name
)
print("Creating samples...")
_ = create_sample_row_based(dataset_definition)
print("Creating chunk definition...")
chunk_definition = create_chunk_definition(
dataset_definition,
sorting_columns,
target_column,
datetime_partition_column,
target_class=target_class,
chunking_partition_method=chunking_partition_method,
)
print("Creating and starting project...")
project = create_project(
dataset_id, target_column, chunk_definition, use_case_id, datetime_partition_column
)
print(f"Time taken: {time.time() - start_time:.1f}s — Project ID: {project.id}")
return project
def get_credential_id(credential_name: str) -> str:
credentials = [cr for cr in dr.Credential.list() if cr.name == credential_name]
if not credentials:
raise ValueError(f"No credential found with name: {credential_name}")
print(f"Credential found: {credential_name}")
return str(credentials[0].credential_id)
def add_project_to_use_case(use_case_id: str, project_id: str) -> None:
project = dr.Project.get(project_id)
use_case = UseCase.get(use_case_id=use_case_id)
use_case.add(entity=project)
def create_dataset_definition(
dataset_id: str,
dataset_version_id: Optional[str],
name: str,
credential_name: str,
) -> DatasetDefinition:
credential_id = get_credential_id(credential_name)
dataset_definition = DatasetDefinition.create(
dataset_id, dataset_version_id, name=name, credentials_id=credential_id
)
print(f"Created dataset definition: {dataset_definition.id}")
DatasetDefinition.analyze(dataset_definition.id)
dataset_definition = DatasetDefinition.get(dataset_definition.id)
dataset_info = dataset_definition.dataset_info
if dataset_info is not None:
print(
f"Dataset info - Total rows: {dataset_info.total_rows}, "
f"Estimated size per row: {dataset_info.estimated_size_per_row}"
)
return dataset_definition
def create_sample_row_based(dataset_definition: DatasetDefinition) -> str:
client = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
dataset_info = dataset_definition.dataset_info
if dataset_info is None:
raise ValueError("Dataset definition has no dataset_info; ensure it has been analyzed")
estimated_size_per_row = dataset_info.estimated_size_per_row
dataset_props = dataset_definition.dataset_props
sample_size_rows = math.ceil(SAMPLE_SIZE / estimated_size_per_row)
print(f"Row count for {SAMPLE_SIZE / (1024**2):.1f}MB: {sample_size_rows}")
sample_rows_payload = {
"samplingStrategy": {
"directive": "efficient-rowbased-sample",
"arguments": {"size": sample_size_rows, "samplingMethod": "rows"},
}
}
response = client.post(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/samples",
json=sample_rows_payload,
)
sample_response = response.json()
row_based_sample_version_id = sample_response["catalogVersionId"]
wait_for_async_resolution(client, response.headers["Location"])
response = client.get(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/versions/?category=SAMPLE"
)
samples = response.json()
if samples["totalCount"] > 0:
sample_df = pd.DataFrame(samples["data"])
mask = sample_df["categories"].apply(lambda x: "SAMPLE" in x)
sample_df = sample_df[mask]
assert row_based_sample_version_id in sample_df["versionId"].tolist(), (
f"Row based sample version {row_based_sample_version_id} not found in samples"
)
else:
raise ValueError(f"No sample version found for dataset {dataset_props.dataset_id}")
return str(row_based_sample_version_id)
def create_sample_percentage_based(dataset_definition: DatasetDefinition) -> str:
client = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
dataset_info = dataset_definition.dataset_info
if dataset_info is None:
raise ValueError("Dataset definition has no dataset_info; ensure it has been analyzed")
source_size = dataset_info.source_size
dataset_props = dataset_definition.dataset_props
sample_size_percentage = (SAMPLE_SIZE / source_size) * 100
print(f"Percentage of dataset size for {SAMPLE_SIZE / (1024**2):.1f}MB: {sample_size_percentage:.2f}%")
sample_percentage_payload = {
"samplingStrategy": {
"directive": "efficient-rowbased-sample",
"arguments": {"size": sample_size_percentage, "samplingMethod": "percent"},
}
}
response = client.post(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/samples",
json=sample_percentage_payload,
)
sample_response = response.json()
percentage_sample_version_id = sample_response["catalogVersionId"]
wait_for_async_resolution(client, response.headers["Location"])
response = client.get(
f"{DR_ENDPOINT}/datasets/{dataset_props.dataset_id}/versions/?category=SAMPLE"
)
samples = response.json()
if samples["totalCount"] > 0:
sample_df = pd.DataFrame(samples["data"])
mask = sample_df["categories"].apply(lambda x: "SAMPLE" in x)
sample_df = sample_df[mask]
assert percentage_sample_version_id in sample_df["versionId"].tolist(), (
f"Percentage sample version {percentage_sample_version_id} not found in samples"
)
else:
raise ValueError(f"No sample version found for dataset {dataset_props.dataset_id}")
return str(percentage_sample_version_id)
def create_project(
dataset_id: str,
target_column: str,
chunk_definition: ChunkDefinition,
use_case_id: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
) -> Project:
project: Project = dr.Project.create_from_dataset(
dataset_id,
project_name="Dynamic Dataset Project with Incremental Learning OTV",
use_sample_from_dataset=True,
max_wait=6000,
)
project_partitioning_method = None
if datetime_partition_column is not None:
print(f"Setting up datetime partitioning for project {project.id}")
spec = dr.DatetimePartitioningSpecification(
datetime_partition_column=datetime_partition_column,
use_time_series=False,
)
full_part = dr.DatetimePartitioning.generate_optimized(project.id, spec, target_column)
project_partitioning_method = dr.helpers.partitioning_methods.DatetimePartitioningId(
full_part.datetime_partitioning_id, project.id
)
print(f"Datetime partitioning set for project {project.id}")
else:
print(f"No datetime partitioning; proceeding with defaults for project {project.id}")
if use_case_id:
add_project_to_use_case(use_case_id=use_case_id, project_id=project.id)
advanced_options = dr.helpers.AdvancedOptions(
incremental_learning_only_mode=True,
incremental_learning_on_best_model=True,
chunk_definition_id=chunk_definition.id,
incremental_learning_early_stopping_rounds=0,
)
project.analyze_and_model(
target=target_column,
mode=dr.enums.AUTOPILOT_MODE.QUICK,
partitioning_method=project_partitioning_method,
advanced_options=advanced_options,
worker_count=-1,
max_wait=6000,
)
return project
def create_chunk_definition(
dataset_definition: DatasetDefinition,
sorting_columns: List[str],
target_column: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
target_class: Optional[str] = None,
chunking_partition_method: Optional[ChunkingPartitionMethod] = ChunkingPartitionMethod.RANDOM,
) -> ChunkDefinition:
partition_args_dict: Dict[str, Optional[str]]
if chunking_partition_method == ChunkingPartitionMethod.RANDOM:
partition_args_dict = {
"target_column": None,
"target_class": None,
"datetime_partition_column": None,
}
elif chunking_partition_method == ChunkingPartitionMethod.STRATIFIED:
partition_args_dict = {
"target_column": target_column,
"target_class": target_class,
"datetime_partition_column": None,
}
else:
partition_args_dict = {
"target_column": None,
"target_class": None,
"datetime_partition_column": datetime_partition_column,
}
chunk_definition = ChunkDefinition.create(
dataset_definition.id,
partition_method=chunking_partition_method,
chunking_strategy_type=ChunkingStrategy.ROWS,
target_column=partition_args_dict["target_column"],
target_class=partition_args_dict["target_class"],
datetime_partition_column=partition_args_dict["datetime_partition_column"],
order_by_columns=sorting_columns,
)
print(f"Created chunk definition: {chunk_definition.id}")
ChunkDefinition.analyze(dataset_definition.id, chunk_definition.id)
chunk_definition = ChunkDefinition.get(dataset_definition.id, chunk_definition.id)
stats = chunk_definition.chunk_definition_stats
if stats is not None:
print(f"Chunk definition stats - Number of chunks: {stats.total_number_of_chunks}")
return chunk_definition
def run_dynamic_dataset_pipeline(
dataset_id: str,
target_column: str,
sorting_columns: List[str],
credential_name: str,
target_class: Optional[str] = None,
chunking_partition_method: Optional[ChunkingPartitionMethod] = ChunkingPartitionMethod.RANDOM,
dataset_version_id: Optional[str] = None,
use_case_id: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
) -> Project:
start_time = time.time()
print("Starting dynamic dataset pipeline...")
dataset_definition = create_dataset_definition(
dataset_id, dataset_version_id, name="Dynamic Dataset", credential_name=credential_name
)
print("Creating samples...")
_ = create_sample_row_based(dataset_definition)
print("Creating chunk definition...")
chunk_definition = create_chunk_definition(
dataset_definition,
sorting_columns,
target_column,
datetime_partition_column,
target_class=target_class,
chunking_partition_method=chunking_partition_method,
)
print("Creating and starting project...")
project = create_project(
dataset_id, target_column, chunk_definition, use_case_id, datetime_partition_column
)
print(f"Time taken: {time.time() - start_time:.1f}s — Project ID: {project.id}")
return project
Configure and run¶
Edit variables, then run. STRATIFIED needs TARGET_CLASS; DATE needs DATETIME_PARTITION_COLUMN.
In [ ]:
Copied!
# --- edit these ---
DATASET_ID = "your-dataset-id"
TARGET_COLUMN = "your_target"
SORTING_COLUMNS = ["TIME", "SERIES"] # order_by columns for chunking
CREDENTIAL_NAME = "your-credential-name" # must exist in DataRobot
DATASET_VERSION_ID = None
USE_CASE_ID = None
TARGET_CLASS = None
DATETIME_PARTITION_COLUMN = None
CHUNKING_PARTITION_METHOD = "RANDOM" # RANDOM | STRATIFIED | DATE
method = ChunkingPartitionMethod[CHUNKING_PARTITION_METHOD.upper()]
if method == ChunkingPartitionMethod.DATE and not DATETIME_PARTITION_COLUMN:
raise ValueError("DATETIME_PARTITION_COLUMN required for DATE")
if method == ChunkingPartitionMethod.STRATIFIED and not TARGET_CLASS:
raise ValueError("TARGET_CLASS required for STRATIFIED")
project = run_dynamic_dataset_pipeline(
dataset_id=DATASET_ID,
target_column=TARGET_COLUMN,
sorting_columns=SORTING_COLUMNS,
credential_name=CREDENTIAL_NAME,
target_class=TARGET_CLASS,
chunking_partition_method=method,
dataset_version_id=DATASET_VERSION_ID,
use_case_id=USE_CASE_ID,
datetime_partition_column=DATETIME_PARTITION_COLUMN,
)
print(f"Project ID: {project.id}")
# --- edit these ---
DATASET_ID = "your-dataset-id"
TARGET_COLUMN = "your_target"
SORTING_COLUMNS = ["TIME", "SERIES"] # order_by columns for chunking
CREDENTIAL_NAME = "your-credential-name" # must exist in DataRobot
DATASET_VERSION_ID = None
USE_CASE_ID = None
TARGET_CLASS = None
DATETIME_PARTITION_COLUMN = None
CHUNKING_PARTITION_METHOD = "RANDOM" # RANDOM | STRATIFIED | DATE
method = ChunkingPartitionMethod[CHUNKING_PARTITION_METHOD.upper()]
if method == ChunkingPartitionMethod.DATE and not DATETIME_PARTITION_COLUMN:
raise ValueError("DATETIME_PARTITION_COLUMN required for DATE")
if method == ChunkingPartitionMethod.STRATIFIED and not TARGET_CLASS:
raise ValueError("TARGET_CLASS required for STRATIFIED")
project = run_dynamic_dataset_pipeline(
dataset_id=DATASET_ID,
target_column=TARGET_COLUMN,
sorting_columns=SORTING_COLUMNS,
credential_name=CREDENTIAL_NAME,
target_class=TARGET_CLASS,
chunking_partition_method=method,
dataset_version_id=DATASET_VERSION_ID,
use_case_id=USE_CASE_ID,
datetime_partition_column=DATETIME_PARTITION_COLUMN,
)
print(f"Project ID: {project.id}")
Optional: percentage-based sample¶
After create_dataset_definition, you can call create_sample_percentage_based(dataset_definition) instead of row-based inside a custom flow.