チャンキングサービスの静的データセット例¶
要件:DataRobot Python SDK、およびSAMPLE_DATA_TO_START_PROJECT機能フラグが有効になっていること。
パイプラインヘルパーでは、モジュールレベルのDR_API_TOKENおよびDR_ENDPOINT(資格情報セルで設定されたもの)を使用します。
認証¶
In [ ]:
Copied!
import os
DR_API_TOKEN = os.environ.get("DR_API_TOKEN", "")
DR_ENDPOINT = os.environ.get("DR_ENDPOINT", "")
if not DR_API_TOKEN or not DR_ENDPOINT:
raise ValueError("Set DR_API_TOKEN and DR_ENDPOINT in the environment or edit this cell.")
import os
DR_API_TOKEN = os.environ.get("DR_API_TOKEN", "")
DR_ENDPOINT = os.environ.get("DR_ENDPOINT", "")
if not DR_API_TOKEN or not DR_ENDPOINT:
raise ValueError("Set DR_API_TOKEN and DR_ENDPOINT in the environment or edit this cell.")
ライブラリのインポート¶
In [ ]:
Copied!
from typing import Any, Optional
import datarobot as dr
from datarobot import UseCase
from datarobot.enums import ChunkingPartitionMethod, ChunkingStrategy
from datarobot.models.chunking_service_v2 import ChunkDefinition, DatasetDefinition
from datarobot.models.project import Project
_ = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
from typing import Any, Optional
import datarobot as dr
from datarobot import UseCase
from datarobot.enums import ChunkingPartitionMethod, ChunkingStrategy
from datarobot.models.chunking_service_v2 import ChunkDefinition, DatasetDefinition
from datarobot.models.project import Project
_ = dr.Client(token=DR_API_TOKEN, endpoint=DR_ENDPOINT)
パイプライン関数の設定¶
In [ ]:
Copied!
def add_project_to_use_case(use_case_id: str, project_id: str) -> None:
project = dr.Project.get(project_id)
use_case = UseCase.get(use_case_id=use_case_id)
use_case.add(entity=project)
def run_ssp_ai_catalog(
dataset_id: str,
target_column: str,
target_class: Optional[str] = None,
dataset_version_id: Optional[str] = None,
use_case_id: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
chunking_partition_method: Optional[ChunkingPartitionMethod] = ChunkingPartitionMethod.RANDOM,
) -> Project:
"""Create dataset + chunk definitions and a project with sample-to-start + incremental learning."""
dataset_definition = DatasetDefinition.create(dataset_id, dataset_version_id)
DatasetDefinition.analyze(dataset_definition.id)
dataset_definition = DatasetDefinition.get(dataset_definition.id)
partition_args: dict[str, Any] = {}
if chunking_partition_method == ChunkingPartitionMethod.RANDOM:
partition_args["target_column"] = None
partition_args["target_class"] = None
partition_args["datetime_partition_column"] = None
elif chunking_partition_method == ChunkingPartitionMethod.STRATIFIED:
partition_args["target_column"] = target_column
partition_args["target_class"] = target_class
partition_args["datetime_partition_column"] = None
else:
partition_args["target_column"] = None
partition_args["target_class"] = None
partition_args["datetime_partition_column"] = datetime_partition_column
chunk_definition = ChunkDefinition.create(
dataset_definition.id,
partition_method=chunking_partition_method,
chunking_strategy_type=ChunkingStrategy.ROWS,
**partition_args,
)
ChunkDefinition.analyze(dataset_definition.id, chunk_definition.id)
chunk_definition = ChunkDefinition.get(dataset_definition.id, chunk_definition.id)
partition_label = (chunking_partition_method or ChunkingPartitionMethod.RANDOM).value.capitalize()
project: Project = dr.Project.create_from_dataset(
dataset_id,
project_name=f"Sample to Start Project {partition_label} - Target: {target_column}",
use_sample_from_dataset=True,
max_wait=6000,
)
project_partitioning_method = None
if datetime_partition_column is not None:
print(f"Setting up datetime partitioning for project {project.id}")
spec = dr.DatetimePartitioningSpecification(
datetime_partition_column=datetime_partition_column,
use_time_series=False,
)
full_part = dr.DatetimePartitioning.generate_optimized(project.id, spec, target_column)
project_partitioning_method = dr.helpers.partitioning_methods.DatetimePartitioningId(
full_part.datetime_partitioning_id, project.id
)
print(f"Datetime partitioning set for project {project.id}")
if use_case_id is not None:
add_project_to_use_case(use_case_id=use_case_id, project_id=project.id)
advanced_options = dr.helpers.AdvancedOptions(
incremental_learning_only_mode=True,
incremental_learning_on_best_model=True,
chunk_definition_id=chunk_definition.id,
incremental_learning_early_stopping_rounds=0,
)
project.analyze_and_model(
target=target_column,
mode=dr.enums.AUTOPILOT_MODE.QUICK,
partitioning_method=project_partitioning_method,
advanced_options=advanced_options,
worker_count=-1,
max_wait=6000,
)
return project
def add_project_to_use_case(use_case_id: str, project_id: str) -> None:
project = dr.Project.get(project_id)
use_case = UseCase.get(use_case_id=use_case_id)
use_case.add(entity=project)
def run_ssp_ai_catalog(
dataset_id: str,
target_column: str,
target_class: Optional[str] = None,
dataset_version_id: Optional[str] = None,
use_case_id: Optional[str] = None,
datetime_partition_column: Optional[str] = None,
chunking_partition_method: Optional[ChunkingPartitionMethod] = ChunkingPartitionMethod.RANDOM,
) -> Project:
"""Create dataset + chunk definitions and a project with sample-to-start + incremental learning."""
dataset_definition = DatasetDefinition.create(dataset_id, dataset_version_id)
DatasetDefinition.analyze(dataset_definition.id)
dataset_definition = DatasetDefinition.get(dataset_definition.id)
partition_args: dict[str, Any] = {}
if chunking_partition_method == ChunkingPartitionMethod.RANDOM:
partition_args["target_column"] = None
partition_args["target_class"] = None
partition_args["datetime_partition_column"] = None
elif chunking_partition_method == ChunkingPartitionMethod.STRATIFIED:
partition_args["target_column"] = target_column
partition_args["target_class"] = target_class
partition_args["datetime_partition_column"] = None
else:
partition_args["target_column"] = None
partition_args["target_class"] = None
partition_args["datetime_partition_column"] = datetime_partition_column
chunk_definition = ChunkDefinition.create(
dataset_definition.id,
partition_method=chunking_partition_method,
chunking_strategy_type=ChunkingStrategy.ROWS,
**partition_args,
)
ChunkDefinition.analyze(dataset_definition.id, chunk_definition.id)
chunk_definition = ChunkDefinition.get(dataset_definition.id, chunk_definition.id)
partition_label = (chunking_partition_method or ChunkingPartitionMethod.RANDOM).value.capitalize()
project: Project = dr.Project.create_from_dataset(
dataset_id,
project_name=f"Sample to Start Project {partition_label} - Target: {target_column}",
use_sample_from_dataset=True,
max_wait=6000,
)
project_partitioning_method = None
if datetime_partition_column is not None:
print(f"Setting up datetime partitioning for project {project.id}")
spec = dr.DatetimePartitioningSpecification(
datetime_partition_column=datetime_partition_column,
use_time_series=False,
)
full_part = dr.DatetimePartitioning.generate_optimized(project.id, spec, target_column)
project_partitioning_method = dr.helpers.partitioning_methods.DatetimePartitioningId(
full_part.datetime_partitioning_id, project.id
)
print(f"Datetime partitioning set for project {project.id}")
if use_case_id is not None:
add_project_to_use_case(use_case_id=use_case_id, project_id=project.id)
advanced_options = dr.helpers.AdvancedOptions(
incremental_learning_only_mode=True,
incremental_learning_on_best_model=True,
chunk_definition_id=chunk_definition.id,
incremental_learning_early_stopping_rounds=0,
)
project.analyze_and_model(
target=target_column,
mode=dr.enums.AUTOPILOT_MODE.QUICK,
partitioning_method=project_partitioning_method,
advanced_options=advanced_options,
worker_count=-1,
max_wait=6000,
)
return project
設定と実行¶
変数を編集してから実行します。STRATIFIEDにはTARGET_CLASSが必要です。DATEにはDATETIME_PARTITION_COLUMNが必要です。
In [ ]:
Copied!
# --- edit these ---
DATASET_ID = "your-dataset-id"
TARGET_COLUMN = "your_target"
DATASET_VERSION_ID = None
USE_CASE_ID = None
TARGET_CLASS = None
DATETIME_PARTITION_COLUMN = None
CHUNKING_PARTITION_METHOD = "RANDOM" # RANDOM | STRATIFIED | DATE
method = ChunkingPartitionMethod[CHUNKING_PARTITION_METHOD.upper()]
if method == ChunkingPartitionMethod.DATE and not DATETIME_PARTITION_COLUMN:
raise ValueError("DATETIME_PARTITION_COLUMN required for DATE")
if method == ChunkingPartitionMethod.STRATIFIED and not TARGET_CLASS:
raise ValueError("TARGET_CLASS required for STRATIFIED")
project = run_ssp_ai_catalog(
DATASET_ID,
TARGET_COLUMN,
target_class=TARGET_CLASS,
dataset_version_id=DATASET_VERSION_ID,
use_case_id=USE_CASE_ID,
datetime_partition_column=DATETIME_PARTITION_COLUMN,
chunking_partition_method=method,
)
print(f"Project ID: {project.id}")
# --- edit these ---
DATASET_ID = "your-dataset-id"
TARGET_COLUMN = "your_target"
DATASET_VERSION_ID = None
USE_CASE_ID = None
TARGET_CLASS = None
DATETIME_PARTITION_COLUMN = None
CHUNKING_PARTITION_METHOD = "RANDOM" # RANDOM | STRATIFIED | DATE
method = ChunkingPartitionMethod[CHUNKING_PARTITION_METHOD.upper()]
if method == ChunkingPartitionMethod.DATE and not DATETIME_PARTITION_COLUMN:
raise ValueError("DATETIME_PARTITION_COLUMN required for DATE")
if method == ChunkingPartitionMethod.STRATIFIED and not TARGET_CLASS:
raise ValueError("TARGET_CLASS required for STRATIFIED")
project = run_ssp_ai_catalog(
DATASET_ID,
TARGET_COLUMN,
target_class=TARGET_CLASS,
dataset_version_id=DATASET_VERSION_ID,
use_case_id=USE_CASE_ID,
datetime_partition_column=DATETIME_PARTITION_COLUMN,
chunking_partition_method=method,
)
print(f"Project ID: {project.id}")