データレジストリのエージェントツールをデプロイしたら、デプロイの概要タブまたはURLからデプロイIDを保存します。 These values are required for agentic workflow development (through a .env file), in the metadata for the custom agentic workflow in Workshop (through a model-metadata.yaml file) and, eventually, in production.
To start building an agentic workflow, clone the datarobot-agent-templates public repository to DataRobot. このリポジトリには、マルチエージェントフレームワークでのAIエージェントの構築およびデプロイのための、すぐに使えるテンプレートが用意されています。 これらのテンプレートを使用すると、最小限の設定要件で独自のエージェントを簡単に設定できます。 そのためには、次のいずれかを実行します。
この基本ステップでは、バージョン11.3.5のdatarobot-agent-templatesリポジトリを使用します。 Ensure that the workspace used for this walkthrough is on that version. Newer versions may not be compatible with the code provided below.
Next, add the following two environment variables to define the deployment IDs copied from the deployed Search Data Registry and Get Data Registry Dataset global agentic tools.
.env
# Data Registry tool deployment IDs
DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID=<YOUR_SEARCH_TOOL_DEPLOYMENT_ID>
DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID=<YOUR_READ_TOOL_DEPLOYMENT_ID>
Running these commands configures the environment for the agent_crewai template, removes all unnecessary files, and prepares the virtualenv to install the additional libraries required to run the selected agent template.
importjsonimportosfromdotenvimportload_dotenvfromtypingimportAny,Generator,Optional,Unionfromurllib.parseimporturljoin,urlparsefromdatarobot_drumimportRuntimeParametersfromcrewaiimportLLM,Agent,Crew,Taskfromcrewai.toolsimportBaseToolfromcrewai_event_listenerimportCrewAIEventListenerfromopenai.types.chatimportCompletionCreateParamsfromragasimportMultiTurnSamplefromragas.messagesimportAIMessage,HumanMessage,ToolMessagefromdatarobot_genai.core.chat.clientimportToolClientfromtool_data_registry_searchimportSearchDataRegistryToolfromtool_data_registry_readimportReadDataRegistryToolload_dotenv()classMyAgent:"""MyAgent is a custom agent that uses CrewAI to search, read, and summarize data from the Data Registry."""def__init__(self,api_key:Optional[str]=None,api_base:Optional[str]=None,model:Optional[str]=None,verbose:Optional[Union[bool,str]]=True,timeout:Optional[int]=90,**kwargs:Any,):"""Initializes the MyAgent class with API key, base URL, model, and verbosity settings. Args: api_key: Optional[str]: API key for authentication with DataRobot services. Defaults to None, in which case it will use the DATAROBOT_API_TOKEN environment variable. api_base: Optional[str]: Base URL for the DataRobot API. Defaults to None, in which case it will use the DATAROBOT_ENDPOINT environment variable. model: Optional[str]: The LLM model to use. Defaults to None. verbose: Optional[Union[bool, str]]: Whether to enable verbose logging. Accepts boolean or string values ("true"/"false"). Defaults to True. timeout: Optional[int]: How long to wait for the agent to respond. Defaults to 90 seconds. **kwargs: Any: Additional keyword arguments passed to the agent. Contains any parameters received in the CompletionCreateParams. Returns: None """self.api_key=api_keyoros.environ.get("DATAROBOT_API_TOKEN")self.api_base=(api_baseoros.environ.get("DATAROBOT_ENDPOINT")or"https://api.datarobot.com")self.model=modelself.timeout=timeoutifisinstance(verbose,str):self.verbose=verbose.lower()=="true"elifisinstance(verbose,bool):self.verbose=verboseself.event_listener=CrewAIEventListener()@propertydefllm(self)->LLM:"""Returns a CrewAI LLM instance configured to use DataRobot's LLM Gateway or a specific deployment. For help configuring different LLM backends see: https://github.com/datarobot-community/datarobot-agent-templates/blob/main/docs/developing-agents-llm-providers.md """api_base=urlparse(self.api_base)ifos.environ.get("LLM_DEPLOYMENT_ID"):path=api_base.pathif"/api/v2/deployments"notinpathand"api/v2/genai"notinpath:# Ensure the API base ends with /api/v2/ for deploymentsifnotpath.endswith("/"):path+="/"ifnotpath.endswith("api/v2/"):path=urljoin(path,"api/v2/")api_base=api_base._replace(path=path)api_base_str=api_base.geturl()deployment_id=os.environ.get("LLM_DEPLOYMENT_ID")returnLLM(model="openai/gpt-4o-mini",api_base=f"{api_base_str}deployments/{deployment_id}/",api_key=self.api_key,timeout=self.timeout,)else:# Use LLM Gatewayapi_base_str=api_base.geturl()ifapi_base_str.endswith("api/v2/"):api_base_str=api_base_str[:-7]# Remove 'api/v2/'elifapi_base_str.endswith("api/v2"):api_base_str=api_base_str[:-6]# Remove 'api/v2'returnLLM(model="datarobot/azure/gpt-4o-mini",api_base=api_base_str,api_key=self.api_key,timeout=self.timeout,)@propertydeftools_client(self)->ToolClient:returnToolClient(api_key=self.api_key,base_url=self.api_base,)## Data Registry tools@propertydeftool_data_registry_search(self)->BaseTool:deployment_id=os.environ.get("DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID")ifnotdeployment_id:deployment_id=RuntimeParameters.get("DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID")print(f"Initializing Data Registry Search Tool with deployment ID: {deployment_id}")returnSearchDataRegistryTool(tool_client=self.tools_client,deployment_id=deployment_id)@propertydeftool_data_registry_read(self)->BaseTool:deployment_id=os.environ.get("DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID")ifnotdeployment_id:deployment_id=RuntimeParameters.get("DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID")print(f"Initializing Data Registry Read Tool with deployment ID: {deployment_id}")returnReadDataRegistryTool(tool_client=self.tools_client,deployment_id=deployment_id,)## Data Registry agents and tasks@propertydefagent_data_registry_searcher(self)->Agent:returnAgent(role="Expert Data Registry Searcher",goal="Search for and retrieve relevant files from Data Registry.",backstory="You are a meticulous analyst that is skilled at examining lists of files and ""determining the most appropriate file based on the context.",verbose=self.verbose,allow_delegation=False,llm=self.llm,)@propertydeftask_data_registry_search(self)->Task:returnTask(description=("You should search for a relevant dataset ID in the Data Registry based on the provided dataset topic: ""{dataset_topic}."),expected_output=("Search for a list of relevant files in the Data Registry and determine the most ""relevant dataset ID that matches the given topic. You should return the entire dataset ID."),agent=self.agent_data_registry_searcher,tools=[self.tool_data_registry_search],)@propertydefagent_data_registry_reader(self)->Agent:returnAgent(role="Expert Data Registry Data Reader and Analyst",goal=("Read the data from a file in the Data Registry and summarize the csv data to answer a question.""The file is likely relatively large, so you should read it in chunks if necessary to answer the question."),backstory="When provided with a dataset name and a dataset ID, you are an expert at using your available""tools to read the data. You always return your responses in a csv style format.""You are an expert at analyzing and understanding csv data, especially when comma separated ""data is presented to you as a string. When provided with a dataset ID you always use your tools ""to read the data, especially data from the Data Registry as you have tools available to you that ""can read the data directly from the Data Registry. You take care to not read too much data at once, ""and read it in chunks, especially when you don't know the data structure and size in advance. ""If necessary, you call available tools many times to read the data in chunks, analyzing the data as you go. ""You can quickly analyze the data and provide informative insights to help users understand ""the data better. You can also answer questions about the data in a concise manner.",verbose=self.verbose,allow_delegation=False,llm=self.llm,)@propertydeftask_data_registry_read(self)->Task:returnTask(description=("Given a dataset ID you should use the dataset ID to read the data from the Data Registry. You then use this ""data to answer the question: {question}. ""It is critically important that you answer this question. The accuracy of your answer is ""paramount. You should always provide a comprehensive and verbose answer."),expected_output=("You should return a comprehensive and verbose answer to the question."),agent=self.agent_data_registry_reader,tools=[self.tool_data_registry_read],)@propertydefagent_response_editor(self)->Agent:returnAgent(role="Expert Editor",goal="Summarize a verbose response provided by an Expert Data Reader and Analyst into a concise ""and clear answer.",backstory="You are an expert editor with a keen eye for detail. When provided with a verbose response from ""an Expert Data Reader and Analyst, your job is to summarize the response into a concise ""and clear answer. You should focus on clarity and brevity, ensuring that the final output ""is easy to understand and directly answers the question posed. You are skilled at distilling ""complex information into simple, digestible formats. You always ensure that the final output ""is accurate and reflects the key points from the original response.",verbose=self.verbose,llm=self.llm,)@propertydeftask_response_edit(self)->Task:returnTask(description=("You are given a verbose summary that is supposed to answer the question: {question}. You must ensure ""that the final output is concise and directly answers the question. It is critically important that ""you summarize the verbose response into a clear and concise answer."),expected_output="You should return a concise summary answer to the question.",agent=self.agent_response_editor,)## Data Registry crewdefcrew(self)->Crew:returnCrew(agents=[self.agent_data_registry_searcher,self.agent_data_registry_reader,self.agent_response_editor],tasks=[self.task_data_registry_search,self.task_data_registry_read,self.task_response_edit],verbose=self.verbose,)asyncdefinvoke(self,completion_create_params:CompletionCreateParams)->Union[Generator[tuple[str,Any|None,dict[str,int]],None,None],tuple[str,Any|None,dict[str,int]],]:"""Invoke the agent with the provided completion parameters. [THIS METHOD IS REQUIRED FOR THE AGENT TO WORK WITH DRUM SERVER] Args: completion_create_params: The completion request parameters including input topic and settings. Returns: Union[ Generator[tuple[str, Any | None, dict[str, int]], None, None], tuple[str, Any | None, dict[str, int]], ]: For streaming requests, returns a generator yielding tuples of (response_text, pipeline_interactions, usage_metrics). For non-streaming requests, returns a single tuple of (response_text, pipeline_interactions, usage_metrics). """# Retrieve the starting user prompt from the CompletionCreateParamsuser_messages=[msgformsgincompletion_create_params["messages"]ifmsg.get("role")=="user"]user_prompt:Any=user_messages[0]ifuser_messageselse{}user_prompt_content=user_prompt.get("content","")# Handle both string and JSON inputsifisinstance(user_prompt_content,str):try:inputs=json.loads(user_prompt_content)ifisinstance(inputs,dict)and"dataset_topic"notininputs:# If it's a dict but doesn't have our expected keys, use the first valueifinputs:first_key,first_value=next(iter(inputs.items()))inputs={"dataset_topic":first_value,"question":first_value}else:# Fallback: use user_prompt_content or empty stringinputs={"dataset_topic":user_prompt_contentifuser_prompt_contentelse"","question":user_prompt_contentifuser_prompt_contentelse""}elifnotisinstance(inputs,dict):inputs={"dataset_topic":inputs,"question":inputs}exceptjson.JSONDecodeError:inputs={"dataset_topic":user_prompt_content,"question":user_prompt_content}else:inputs={"dataset_topic":str(user_prompt_content),"question":str(user_prompt_content)}print("Running agent with inputs:",inputs)# Run the crew with the inputscrew_output=self.crew().kickoff(inputs=inputs)# Extract the response text from the crew outputresponse_text=str(crew_output.raw)# Create a list of events from the event listenerevents=self.event_listener.messagesiflen(events)>0:last_message=events[-1].contentiflast_message!=response_text:events.append(AIMessage(content=response_text))else:events=None# Create pipeline interactions from eventspipeline_interactions=self.create_pipeline_interactions_from_events(events)ifeventselseNone# Create usage metrics from crew outputusage_metrics={"completion_tokens":crew_output.token_usage.completion_tokens,"prompt_tokens":crew_output.token_usage.prompt_tokens,"total_tokens":crew_output.token_usage.total_tokens,}returnresponse_text,pipeline_interactions,usage_metrics@staticmethoddefcreate_pipeline_interactions_from_events(events:list[Union[HumanMessage,AIMessage,ToolMessage]],)->MultiTurnSample|None:"""Convert a list of events into a MultiTurnSample. Creates the pipeline interactions for moderations and evaluation (e.g. Task Adherence, Agent Goal Accuracy, Tool Call Accuracy) """ifnotevents:returnNonereturnMultiTurnSample(user_input=events)
fromabcimportABCfromcrewai.toolsimportBaseToolfromdatarobot_genai.core.chat.clientimportToolClientclassBaseToolWithDeployment(BaseTool,ABC):model_config={"arbitrary_types_allowed":True}"""Adds support for arbitrary types in Pydantic models, needed for the ToolClient."""tool_client:ToolClient"""The tool client initialized by the agent, which has access to its authorization context."""deployment_id:str"""The DataRobot deployment ID of the custom model that will execute tool logic."""
importjsonfromtypingimportDict,TypefromtypingimportListfrompydanticimportBaseModelasPydanticBaseModel,Fieldfromtool_deploymentimportBaseToolWithDeploymentclassSearchDataRegistryArgs(PydanticBaseModel):search_terms:str=Field(default="",description="Terms for the search. Leave blank to return all datasets.")limit:int=Field(default=2,description="The maximum number of datasets to return. Set to -1 to return all.")classSearchDataRegistryTool(BaseToolWithDeployment):name:str="Search Data Registry"description:str=("This tool provides a list of all available dataset names and their associated IDs from the Data Registry. ""You should always check to see if the dataset you are looking for can be found here. ""For future queries, you should use the associated dataset ID instead of the name to avoid ambiguity.")args_schema:Type[PydanticBaseModel]=SearchDataRegistryArgsdef_run(self,search_terms:str="",limit:int=2)->List[Dict[str,str]]:request_payload={"search_terms":search_terms,"limit":limit}result=self.tool_client.call(self.deployment_id,payload=request_payload,)returnjson.loads(result.data).get("datasets",[])
fromioimportBytesIOfromtypingimportType,UnionimportpandasaspdfrompydanticimportBaseModelasPydanticBaseModel,Fieldfromtool_deploymentimportBaseToolWithDeploymentclassReadDataRegistryArgs(PydanticBaseModel):dataset_id:str=Field(...,description="The ID of the dataset to read from the Data Registry.")offset:int=Field(default=0,description="The first row to start reading the dataset from.")limit:int|None=Field(default=20,description=("The number of rows to read from the dataset. If not set, will return all rows."))classReadDataRegistryTool(BaseToolWithDeployment):name:str="Read Data Registry Dataset"description:str=("This tool reads the contents of a Data Registry dataset when given the `dataset_id` input parameter. ""Use this tool whenever you need to read the contents of a dataset. ""Optionally, you can specify `offset` and `limit` to read a slice of the dataset rows.")args_schema:Type[PydanticBaseModel]=ReadDataRegistryArgsdef_get_dataframe_from_response(self,data:Union[bytes,pd.DataFrame],headers:dict[str,str])->pd.DataFrame:"""Convert response data into a DataFrame. Args: data (Union[bytes, pd.DataFrame]): The response data, either as bytes or a DataFrame. headers (dict[str, str]): The response headers. Returns: pd.DataFrame: The parsed DataFrame. Raises: ValueError: If the response data is neither bytes nor a DataFrame. """ifisinstance(data,bytes)andheaders.get("Content-Type","").lower().startswith("text/csv"):returnpd.read_csv(BytesIO(data))ifisinstance(data,pd.DataFrame):returndataraiseValueError("The response data must be either bytes or a DataFrame.")def_run(self,dataset_id:str,offset:int=0,limit:int|None=None)->str:ifnotdataset_id:raiseValueError("dataset_id is required but was not provided")rv=self.tool_client.call(self.deployment_id,payload={"dataset_id":dataset_id,"offset":offset,"limit":limit,},)try:returnself._get_dataframe_from_response(rv.data,rv.response_headers).to_csv()exceptExceptionase:raiseValueError(f"Could not read dataset with dataset_id '{dataset_id}'. "f"Please verify that the dataset_id exists and you have access to it. Error: {e}")
While this step isn't required to use this agent locally or in a codespace, it's important to modify the existing model-metadata.yaml file for use in an agentic playground or in production.