Skip to content

How-to: Add Data Registry tools to the CrewAI template

After you clone the DataRobot agentic templates repository and create an agentic workflow from the CrewAI agent template, you can modify this template to create a simple agent to search the Data Registry, read a dataset, and explain the contents. This agentic workflow calls deployed DataRobot global agentic tools to search the Data Registry for a dataset and then read that dataset.

Deploy the global agentic tools

This walkthrough assumes you've deployed the Search Data Registry and Get Data Registry Dataset global tools from Registry.

Deploying the global agentic tools

These tools are deployed the same way as any other registered model. As unstructured models, the deployment settings available are limited; however, one of the available settings is Choose prediction environment. Selecting a DataRobot Serverless environment is recommended.

After deploying the Data Registry agentic tools, save the deployment IDs from the deployment Overview tab or URL. These values are required for agentic workflow development (through a .env file), in the metadata for the custom agentic workflow in Workshop (through a model-metadata.yaml file) and, eventually, in production.

Clone the datarobot-agent-templates repository

To start building an agentic workflow, clone the datarobot-agent-templates public repository to DataRobot. This repository provides ready-to-use templates for building and deploying AI agents with multi-agent frameworks. These templates streamline the process of setting up your own agents with minimal configuration requirements. To do this, do one of the following:

git clone --branch 11.3.5 --depth 1 https://github.com/datarobot-community/datarobot-agent-templates.git
cd datarobot-agent-templates

datarobot-agent-templates version

This walkthrough uses version 11.3.5 of the datarobot-agent-templates repository. Ensure that the workspace used for this walkthrough is on that version. Newer versions may not be compatible with the code provided below.

GitHub clone URL

For more information on cloning a GitHub repository, see the GitHub documentation.

Set up the environment

In the new directory containing the datarobot-agent-templates repository, use the command below to copy and rename the provided template environment file (.env.template). In this file, define the necessary environment variables.

cp .env.template .env

In the new .env file, enter your DATAROBOT_API_TOKEN and DATAROBOT_ENDPOINT. Then, enter any string to define a PULUMI_CONFIG_PASSPHRASE.

DataRobot credentials in codespaces

If you are using a DataRobot codespace, remove the DATAROBOT_API_TOKEN and DATAROBOT_ENDPOINT environment variables from the file, as they already exist in the codespace environment.

Next, add the following two environment variables to define the deployment IDs copied from the deployed Search Data Registry and Get Data Registry Dataset global agentic tools.

.env
# Data Registry tool deployment IDs
DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID=<YOUR_SEARCH_TOOL_DEPLOYMENT_ID>
DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID=<YOUR_READ_TOOL_DEPLOYMENT_ID>

These values are loaded into the agent.py file using dotenv to read the key-value pairs from the .env file.

Run the CrewAI quickstart

Next, use task start to run quickstart.py, selecting the CrewAI template.

task start

To select the CrewAI template press 1 and then press Enter to confirm your selection.

Example start command
task start
task: [start] uv run quickstart.py
*****           *          ****        *             *  
*    *  ***   *****  ***   *   *  ***  ****   ***  *****
*    * *   *    *   *   *  ****  *   * *  *  *   *   *  
*****   *** *   *    *** * *   *  ***  ****   ***    *  

--------------------------------------------------------
           Quickstart for DataRobot AI Agents           
--------------------------------------------------------
Checking environment setup for required pre-requisites...
All pre-requisites are installed.


You will now select an agentic framework to use for this project.
For more information on the different agentic frameworks please go to:
  https://github.com/datarobot-community/datarobot-agent-templates/blob/main/docs/getting-started.md

Please select an agentic framework to use:
1. agent_crewai
2. agent_generic_base
3. agent_langgraph
4. agent_llamaindex
5. agent_nat
Enter your choice (1-5): 1

Next, press Y and then Enter to install prerequisites and set up environments for the selected agent:

Example install prompt
Would you like to setup the uv python environments and install pre-requisites now?
(y/n): y

Running these commands configures the environment for the agent_crewai template, removes all unnecessary files, and prepares the virtualenv to install the additional libraries required to run the selected agent template.

You can refresh the installed environment at any time by running:

task install

Optionally, before customizing the agent template, run the agent without modification. To test the code, use the following command:

task agent:cli START_DEV=1 -- execute --user_prompt 'Hi, how are you?'

You can also send a structured query as a prompt if your agent requires it.

task agent:cli START_DEV=1 -- execute --user_prompt '{"topic":"Generative AI"}'

Now you can customize the code of your agent in the agent_crewai/custom_model directory. In this walkthrough, the environment remains unchanged.

View all task commands

Before running task start, to view available tasks for the project, run the task command as shown below:

 task
task: Available tasks for this project:
* default:       ℹ️ Show all available tasks (run `task --list-all` to see hidden tasks)
* install:       Install dependencies for all agent components and infra      (aliases: req, install-all)
* start:         ‼️ Quickstart for DataRobot Agent Templates ‼️

After running task start and selecting a framework, to view available tasks for the project, run the task command as shown below:

 task
task: Available tasks for this project:
* default:                           ℹ️ Show all available tasks (run `task --list-all` to see hidden tasks)
* install:                           🛠️ Install all dependencies for agent and infra
* agent:install:                     🛠️ [agent_crewai] Install agent uv dependencies      (aliases: agent:req)
* agent:add-dependency:              🛠️ [agent_crewai] Add provided packages as a new dependency to an agent
* agent:cli:                         🖥️ [agent_crewai] Run the CLI with provided arguments
* agent:dev:                         🔨 [agent_crewai] Run the development server
* agent:dev-stop:                    🛑 [agent_crewai] Stop the development server
* agent:chainlit:                    🛝 Run the Chainlit playground
* agent:create-docker-context:       🐳 [agent_crewai] Create the template for a local docker_context image
* agent:build-docker-context:        🐳 [agent_crewai] Build the Docker image
* infra:install:                     🛠️ [infra] Install infra uv dependencies
* infra:build:                       🔵 Deploy only playground testing resources with pulumi
* infra:deploy:                      🟢 Deploy all resources with pulumi
* infra:refresh:                     ⚪️ Refresh and sync local pulumi state
* infra:destroy:                     🔴 Teardown all deployed resources with pulumi
 task --list-all
task: Available tasks for this project:
* build:                             
* default:                           ℹ️ Show all available tasks (run `task --list-all` to see hidden tasks)
* deploy:                            
* destroy:                           
* install:                           🛠️ Install all dependencies for agent and infra
* agent:add-dependency:              🛠️ [agent_crewai] Add provided packages as a new dependency to an agent
* agent:build-docker-context:        🐳 [agent_crewai] Build the Docker image
* agent:chainlit:                    🛝 Run the Chainlit playground
* agent:cli:                         🖥️ [agent_crewai] Run the CLI with provided arguments
* agent:create-docker-context:       🐳 [agent_crewai] Create the template for a local docker_context image
* agent:dev:                         🔨 [agent_crewai] Run the development server
* agent:dev-stop:                    🛑 [agent_crewai] Stop the development server
* agent:install:                     🛠️ [agent_crewai] Install agent uv dependencies      (aliases: agent:req)
* agent:lint:                        
* agent:lint-check:                  
* agent:test:                        
* agent:test-coverage:               
* agent:update:                      
* infra:build:                       🔵 Deploy only playground testing resources with pulumi
* infra:deploy:                      🟢 Deploy all resources with pulumi
* infra:destroy:                     🔴 Teardown all deployed resources with pulumi
* infra:info:                        
* infra:init:                        
* infra:install:                     🛠️ [infra] Install infra uv dependencies
* infra:install-pulumi-plugin:       
* infra:lint:                        
* infra:lint-check:                  
* infra:pulumi:                      
* infra:refresh:                     ⚪️ Refresh and sync local pulumi state
* infra:select:                      
* infra:select-env-stack:            
* infra:test:                        
* infra:test-coverage:  

In addition, to view all available agent CLI commands, run task agent:cli.

Customize the CrewAI template files

To customize the default CrewAI template to create a Data Registry Search and Summarize agentic workflow, open the agent_crewai/custom_model directory and make the following changes to the custom model artifacts:

  • Modify the agent.py file.
  • Create a tool_deployment.py file.
  • Create a tool_data_registry_search.py file.
  • Create a tool_data_registry_read.py file.
  • Modify the model-metadata.yaml file.

Modify the agent.py file

Replace the contents of the CrewAI template's agent.py file with the code below. This replaces the previous planning, writing, and editing agents and tasks with tools to search and read datasets from the Data Registry, followed by new agents and tasks to use these tools to carry out the search, read, and edit workflow.

Copy code from this walkthrough

This walkthrough requires copying large code blocks to modify the existing template. To copy the full contents of a code snippet, click Copy to clipboard in the upper-right corner of the snippet.

Modified file: agent.py
agent.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import json
import os
from dotenv import load_dotenv
from typing import Any, Generator, Optional, Union
from urllib.parse import urljoin, urlparse

from datarobot_drum import RuntimeParameters

from crewai import LLM, Agent, Crew, Task
from crewai.tools import BaseTool
from crewai_event_listener import CrewAIEventListener
from openai.types.chat import CompletionCreateParams
from ragas import MultiTurnSample
from ragas.messages import AIMessage, HumanMessage, ToolMessage

from datarobot_genai.core.chat.client import ToolClient
from tool_data_registry_search import SearchDataRegistryTool
from tool_data_registry_read import ReadDataRegistryTool

load_dotenv()

class MyAgent:
    """MyAgent is a custom agent that uses CrewAI to search, read, and summarize data from the Data Registry."""

    def __init__(
        self,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        model: Optional[str] = None,
        verbose: Optional[Union[bool, str]] = True,
        timeout: Optional[int] = 90,
        **kwargs: Any,
    ):
        """Initializes the MyAgent class with API key, base URL, model, and verbosity settings.

        Args:
            api_key: Optional[str]: API key for authentication with DataRobot services.
                Defaults to None, in which case it will use the DATAROBOT_API_TOKEN environment variable.
            api_base: Optional[str]: Base URL for the DataRobot API.
                Defaults to None, in which case it will use the DATAROBOT_ENDPOINT environment variable.
            model: Optional[str]: The LLM model to use.
                Defaults to None.
            verbose: Optional[Union[bool, str]]: Whether to enable verbose logging.
                Accepts boolean or string values ("true"/"false"). Defaults to True.
            timeout: Optional[int]: How long to wait for the agent to respond.
                Defaults to 90 seconds.
            **kwargs: Any: Additional keyword arguments passed to the agent.
                Contains any parameters received in the CompletionCreateParams.

        Returns:
            None
        """
        self.api_key = api_key or os.environ.get("DATAROBOT_API_TOKEN")
        self.api_base = (
            api_base
            or os.environ.get("DATAROBOT_ENDPOINT")
            or "https://api.datarobot.com"
        )
        self.model = model
        self.timeout = timeout
        if isinstance(verbose, str):
            self.verbose = verbose.lower() == "true"
        elif isinstance(verbose, bool):
            self.verbose = verbose
        self.event_listener = CrewAIEventListener()

@property
def llm(self) -> LLM:
    """Returns a CrewAI LLM instance configured to use DataRobot's LLM Gateway or a specific deployment.

    For help configuring different LLM backends see:
    https://github.com/datarobot-community/datarobot-agent-templates/blob/main/docs/developing-agents-llm-providers.md
    """
    api_base = urlparse(self.api_base)
    if os.environ.get("LLM_DEPLOYMENT_ID"):
        path = api_base.path
        if "/api/v2/deployments" not in path and "api/v2/genai" not in path:
            # Ensure the API base ends with /api/v2/ for deployments
            if not path.endswith("/"):
                path += "/"
            if not path.endswith("api/v2/"):
                path = urljoin(path, "api/v2/")
        api_base = api_base._replace(path=path)
        api_base_str = api_base.geturl()
        deployment_id = os.environ.get("LLM_DEPLOYMENT_ID")
        return LLM(
            model="openai/gpt-4o-mini",
            api_base=f"{api_base_str}deployments/{deployment_id}/",
            api_key=self.api_key,
            timeout=self.timeout,
        )
    else:
        # Use LLM Gateway
        api_base_str = api_base.geturl()
        if api_base_str.endswith("api/v2/"):
            api_base_str = api_base_str[:-7]  # Remove 'api/v2/'
        elif api_base_str.endswith("api/v2"):
            api_base_str = api_base_str[:-6]  # Remove 'api/v2'
        return LLM(
            model="datarobot/azure/gpt-4o-mini",
            api_base=api_base_str,
            api_key=self.api_key,
            timeout=self.timeout,
        )

@property
def tools_client(self) -> ToolClient:
    return ToolClient(
        api_key=self.api_key,
        base_url=self.api_base,
    )

## Data Registry tools
@property
def tool_data_registry_search(self) -> BaseTool:
    deployment_id = os.environ.get("DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID")
    if not deployment_id:
        deployment_id = RuntimeParameters.get("DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID")

    print(
        f"Initializing Data Registry Search Tool with deployment ID: {deployment_id}"
    )
    return SearchDataRegistryTool(
        tool_client=self.tools_client,
        deployment_id=deployment_id
    )

@property
def tool_data_registry_read(self) -> BaseTool:
    deployment_id = os.environ.get("DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID")
    if not deployment_id:
        deployment_id = RuntimeParameters.get("DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID")

    print(
        f"Initializing Data Registry Read Tool with deployment ID: {deployment_id}"
    )
    return ReadDataRegistryTool(
        tool_client=self.tools_client,
        deployment_id=deployment_id,
    )

## Data Registry agents and tasks
@property
def agent_data_registry_searcher(self) -> Agent:
    return Agent(
        role="Expert Data Registry Searcher",
        goal="Search for and retrieve relevant files from Data Registry.",
        backstory="You are a meticulous analyst that is skilled at examining lists of files and "
        "determining the most appropriate file based on the context.",
        verbose=self.verbose,
        allow_delegation=False,
        llm=self.llm,
    )

@property
def task_data_registry_search(self) -> Task:
    return Task(
        description=(
            "You should search for a relevant dataset ID in the Data Registry based on the provided dataset topic: "
            "{dataset_topic}."
        ),
        expected_output=(
            "Search for a list of relevant files in the Data Registry and determine the most "
            "relevant dataset ID that matches the given topic. You should return the entire dataset ID."
        ),
        agent=self.agent_data_registry_searcher,
        tools=[self.tool_data_registry_search],
    )

@property
def agent_data_registry_reader(self) -> Agent:
    return Agent(
        role="Expert Data Registry Data Reader and Analyst",
        goal=(
            "Read the data from a file in the Data Registry and summarize the csv data to answer a question."
            "The file is likely relatively large, so you should read it in chunks if necessary to answer the question."
        ),
        backstory="When provided with a dataset name and a dataset ID, you are an expert at using your available"
        "tools to read the data. You always return your responses in a csv style format."
        "You are an expert at analyzing and understanding csv data, especially when comma separated "
        "data is presented to you as a string. When provided with a dataset ID you always use your tools "
        "to read the data, especially data from the Data Registry as you have tools available to you that "
        "can read the data directly from the Data Registry. You take care to not read too much data at once, "
        "and read it in chunks, especially when you don't know the data structure and size in advance. "
        "If necessary, you call available tools many times to read the data in chunks, analyzing the data as you go. "
        "You can quickly analyze the data and provide informative insights to help users understand "
        "the data better. You can also answer questions about the data in a concise manner.",
        verbose=self.verbose,
        allow_delegation=False,
        llm=self.llm,
    )

@property
def task_data_registry_read(self) -> Task:
    return Task(
        description=(
            "Given a dataset ID you should use the dataset ID to read the data from the Data Registry. You then use this "
            "data to answer the question: {question}. "
            "It is critically important that you answer this question. The accuracy of your answer is "
            "paramount. You should always provide a comprehensive and verbose answer."
        ),
        expected_output=(
            "You should return a comprehensive and verbose answer to the question."
        ),
        agent=self.agent_data_registry_reader,
        tools=[self.tool_data_registry_read],
    )

@property
def agent_response_editor(self) -> Agent:
    return Agent(
        role="Expert Editor",
        goal="Summarize a verbose response provided by an Expert Data Reader and Analyst into a concise "
        "and clear answer.",
        backstory="You are an expert editor with a keen eye for detail. When provided with a verbose response from "
        "an Expert Data Reader and Analyst, your job is to summarize the response into a concise "
        "and clear answer. You should focus on clarity and brevity, ensuring that the final output "
        "is easy to understand and directly answers the question posed. You are skilled at distilling "
        "complex information into simple, digestible formats. You always ensure that the final output "
        "is accurate and reflects the key points from the original response.",
        verbose=self.verbose,
        llm=self.llm,
    )

@property
def task_response_edit(self) -> Task:
    return Task(
        description=(
            "You are given a verbose summary that is supposed to answer the question: {question}. You must ensure "
            "that the final output is concise and directly answers the question. It is critically important that "
            "you summarize the verbose response into a clear and concise answer."
        ),
        expected_output="You should return a concise summary answer to the question.",
        agent=self.agent_response_editor,
    )

## Data Registry crew
def crew(self) -> Crew:
    return Crew(
        agents=[self.agent_data_registry_searcher, self.agent_data_registry_reader, self.agent_response_editor],
        tasks=[self.task_data_registry_search, self.task_data_registry_read, self.task_response_edit],
        verbose=self.verbose,
    )

async def invoke(
    self, completion_create_params: CompletionCreateParams
) -> Union[
    Generator[tuple[str, Any | None, dict[str, int]], None, None],
    tuple[str, Any | None, dict[str, int]],
]:
    """Invoke the agent with the provided completion parameters.

    [THIS METHOD IS REQUIRED FOR THE AGENT TO WORK WITH DRUM SERVER]

    Args:
        completion_create_params: The completion request parameters including input topic and settings.
    Returns:
        Union[
            Generator[tuple[str, Any | None, dict[str, int]], None, None],
            tuple[str, Any | None, dict[str, int]],
        ]: For streaming requests, returns a generator yielding tuples of (response_text, pipeline_interactions, usage_metrics).
           For non-streaming requests, returns a single tuple of (response_text, pipeline_interactions, usage_metrics).

    """
    # Retrieve the starting user prompt from the CompletionCreateParams
    user_messages = [
        msg
        for msg in completion_create_params["messages"]
        if msg.get("role") == "user"
    ]
    user_prompt: Any = user_messages[0] if user_messages else {}
    user_prompt_content = user_prompt.get("content", "")

    # Handle both string and JSON inputs
    if isinstance(user_prompt_content, str):
        try:
            inputs = json.loads(user_prompt_content)
            if isinstance(inputs, dict) and "dataset_topic" not in inputs:
                # If it's a dict but doesn't have our expected keys, use the first value
                if inputs:
                    first_key, first_value = next(iter(inputs.items()))
                    inputs = {
                        "dataset_topic": first_value,
                        "question": first_value
                    }
                else:
                    # Fallback: use user_prompt_content or empty string
                    inputs = {
                        "dataset_topic": user_prompt_content if user_prompt_content else "",
                        "question": user_prompt_content if user_prompt_content else ""
                    }
            elif not isinstance(inputs, dict):
                inputs = {
                    "dataset_topic": inputs,
                    "question": inputs
                }
        except json.JSONDecodeError:
            inputs = {
                "dataset_topic": user_prompt_content,
                "question": user_prompt_content
            }
    else:
        inputs = {
            "dataset_topic": str(user_prompt_content),
            "question": str(user_prompt_content)
        }

    print("Running agent with inputs:", inputs)

    # Run the crew with the inputs
    crew_output = self.crew().kickoff(inputs=inputs)

    # Extract the response text from the crew output
    response_text = str(crew_output.raw)

    # Create a list of events from the event listener
    events = self.event_listener.messages
    if len(events) > 0:
        last_message = events[-1].content
        if last_message != response_text:
            events.append(AIMessage(content=response_text))
    else:
        events = None
    # Create pipeline interactions from events
    pipeline_interactions = self.create_pipeline_interactions_from_events(events) if events else None

    # Create usage metrics from crew output
    usage_metrics = {
        "completion_tokens": crew_output.token_usage.completion_tokens,
        "prompt_tokens": crew_output.token_usage.prompt_tokens,
        "total_tokens": crew_output.token_usage.total_tokens,
    }

    return response_text, pipeline_interactions, usage_metrics

@staticmethod
def create_pipeline_interactions_from_events(
    events: list[Union[HumanMessage, AIMessage, ToolMessage]],
) -> MultiTurnSample | None:
    """Convert a list of events into a MultiTurnSample.

    Creates the pipeline interactions for moderations and evaluation
    (e.g. Task Adherence, Agent Goal Accuracy, Tool Call Accuracy)
    """
    if not events:
        return None
    return MultiTurnSample(user_input=events)

Create the tool_deployment.py file

To implement deployed tools in an agentic workflow, create a tool_deployment.py file in the agent_crewai/custom_model directory. After you create the file, add the contents below.

Modified file: tool_deployment.py
tool_deployment.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from abc import ABC
from crewai.tools import BaseTool
from datarobot_genai.core.chat.client import ToolClient

class BaseToolWithDeployment(BaseTool, ABC):
    model_config = {
        "arbitrary_types_allowed": True
    }
    """Adds support for arbitrary types in Pydantic models, needed for the ToolClient."""

    tool_client: ToolClient
    """The tool client initialized by the agent, which has access to its authorization context."""

    deployment_id: str
    """The DataRobot deployment ID of the custom model that will execute tool logic."""

Create the tool_data_registry_search.py file

Create a tool_data_registry_search.py file in the agent_crewai/custom_model directory. The updated agent.py file imports the SearchDataRegistryTool class defined in this file using an import statement: from tool_data_registry_search import SearchDataRegistryTool. After you create the file, add the contents below.

New file: tool_data_registry_search.py
tool_data_registry_search.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
from typing import Dict, Type
from typing import List

from pydantic import BaseModel as PydanticBaseModel, Field

from tool_deployment import BaseToolWithDeployment

class SearchDataRegistryArgs(PydanticBaseModel):
    search_terms: str = Field(
        default="",
        description="Terms for the search. Leave blank to return all datasets."
    )
    limit: int = Field(
        default=2,
        description="The maximum number of datasets to return. Set to -1 to return all."
    )

class SearchDataRegistryTool(BaseToolWithDeployment):
    name: str = "Search Data Registry"
    description: str = (
        "This tool provides a list of all available dataset names and their associated IDs from the Data Registry. "
        "You should always check to see if the dataset you are looking for can be found here. "
        "For future queries, you should use the associated dataset ID instead of the name to avoid ambiguity."
    )
    args_schema: Type[PydanticBaseModel] = SearchDataRegistryArgs

    def _run(self, search_terms: str = "", limit: int = 2) -> List[Dict[str, str]]:

        request_payload = {"search_terms": search_terms, "limit": limit}

        result = self.tool_client.call(
            self.deployment_id,
            payload=request_payload,
        )

        return json.loads(result.data).get("datasets", [])

Create the tool_data_registry_read.py file

Create a tool_data_registry_read.py file in the agent_crewai/custom_model directory. The updated agent.py file imports the ReadDataRegistryTool class defined in this file using an import statement: from tool_data_registry_read import ReadDataRegistryTool. After you create the file, add the contents below.

New file: tool_data_registry_read.py
tool_data_registry_read.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from io import BytesIO
from typing import Type, Union

import pandas as pd
from pydantic import BaseModel as PydanticBaseModel, Field
from tool_deployment import BaseToolWithDeployment

class ReadDataRegistryArgs(PydanticBaseModel):
    dataset_id: str = Field(
        ...,
        description="The ID of the dataset to read from the Data Registry."
    )
    offset: int = Field(
        default=0,
        description="The first row to start reading the dataset from."
    )
    limit: int | None = Field(
        default=20,
        description=(
            "The number of rows to read from the dataset. If not set, will return all rows."
        )
    )

class ReadDataRegistryTool(BaseToolWithDeployment):
    name: str = "Read Data Registry Dataset"
    description: str = (
        "This tool reads the contents of a Data Registry dataset when given the `dataset_id` input parameter. "
        "Use this tool whenever you need to read the contents of a dataset. "
        "Optionally, you can specify `offset` and `limit` to read a slice of the dataset rows."
    )
    args_schema: Type[PydanticBaseModel] = ReadDataRegistryArgs

    def _get_dataframe_from_response(
        self, data: Union[bytes, pd.DataFrame], headers: dict[str, str]
    ) -> pd.DataFrame:
        """Convert response data into a DataFrame.

        Args:
            data (Union[bytes, pd.DataFrame]): The response data, either as bytes or a DataFrame.
            headers (dict[str, str]): The response headers.

        Returns:
            pd.DataFrame: The parsed DataFrame.

        Raises:
            ValueError: If the response data is neither bytes nor a DataFrame.
        """
        if isinstance(data, bytes) and headers.get("Content-Type", "").lower().startswith("text/csv"):
            return pd.read_csv(BytesIO(data))
        if isinstance(data, pd.DataFrame):
            return data
        raise ValueError("The response data must be either bytes or a DataFrame.")

    def _run(self, dataset_id: str, offset: int=0, limit: int | None = None) -> str:
        if not dataset_id:
            raise ValueError("dataset_id is required but was not provided")

        rv = self.tool_client.call(self.deployment_id, payload={
            "dataset_id": dataset_id,
            "offset": offset,
            "limit": limit,
        }, )
        try:
            return self._get_dataframe_from_response(rv.data, rv.response_headers).to_csv()
        except Exception as e:
            raise ValueError(
                f"Could not read dataset with dataset_id '{dataset_id}'. "
                f"Please verify that the dataset_id exists and you have access to it. Error: {e}"
            )

Modify the model-metadata.yaml file

While this step isn't required to use this agent locally or in a codespace, it's important to modify the existing model-metadata.yaml file for use in an agentic playground or in production.

model-metadata.yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
---
name: agent_crewai
type: inference
targetType: agenticworkflow
runtimeParameterDefinitions:
  - fieldName: LLM_DEPLOYMENT_ID
    defaultValue: <YOUR_LLM_DEPLOYMENT_ID>
    type: string
  - fieldName: DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID
    defaultValue: <YOUR_SEARCH_TOOL_DEPLOYMENT_ID>
    type: string
  - fieldName: DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID
    defaultValue: <YOUR_READ_TOOL_DEPLOYMENT_ID>
    type: string

Test the modified agentic workflow

After making all necessary modifications and additions to the agentic workflow custom model in the agent_crewai/custom_model directory, test the workflow with the following command. Modify the --user_prompt argument to target a dataset present in the Data Registry.

task agent:cli START_DEV=1 -- execute --user_prompt 'Describe a space dataset.'

You can also send a structured query as a prompt. Again, modify the --user_prompt argument to target a dataset present in the Data Registry.

task agent:cli START_DEV=1 -- execute --user_prompt '{"dataset_topic":"Space", "question": "Please describe the dataset and show a sample of a few rows"}'

Troubleshooting

If you encounter issues while testing an agentic workflow, it can be helpful to update agent environments and dependencies. To do this, you can use the task install (or task setup), task agent:install, and task agent:update commands. For more information on these commands, and others, use the task command.

If you see a connection error when testing, ensure you're using START_DEV=1 in the command, or start the dev server separately with task agent:dev in a different terminal window.

Next steps

After successfully testing the updated agentic workflow for searching the Data Registry and summarizing a dataset, send the workflow to Workshop to connect it to an agentic playground, or deploy the workflow.