Skip to content

How-to:CrewAIテンプレートにデータ登録ツールを追加する

DataRobotのエージェントテンプレートリポジトリのクローンを作成し、CrewAIエージェントテンプレートからエージェントワークフローを作成したら、このテンプレートを修正して、データレジストリの検索、データセットの読み取り、内容の説明を行う簡単なエージェントを作成できます。 このエージェントワークフローは、デプロイ済みのDataRobotグローバルエージェントツールを呼び出して、データレジストリでデータセットを検索し、そのデータセットを読み取ります。

グローバルエージェントツールのデプロイ

この基本ステップでは、レジストリからグローバルツールSearch Data RegistryGet Data Registry Datasetをデプロイしていることを前提としています。

グローバルエージェントツールのデプロイ

これらのツールは他の登録モデルと同じようにデプロイされます。 非構造化モデルであるため、利用可能なデプロイ設定は限られています。ただし、利用可能な設定の1つに予測環境を選択があります。 DataRobotのサーバーレス環境を選択することをお勧めします。

データレジストリのエージェントツールをデプロイしたら、デプロイの概要タブまたはURLからデプロイIDを保存します。 These values are required for agentic workflow development (through a .env file), in the metadata for the custom agentic workflow in Workshop (through a model-metadata.yaml file) and, eventually, in production.

datarobot-agent-templatesリポジトリのクローンを作成する

To start building an agentic workflow, clone the datarobot-agent-templates public repository to DataRobot. このリポジトリには、マルチエージェントフレームワークでのAIエージェントの構築およびデプロイのための、すぐに使えるテンプレートが用意されています。 これらのテンプレートを使用すると、最小限の設定要件で独自のエージェントを簡単に設定できます。 そのためには、次のいずれかを実行します。

git clone --branch 11.3.5 --depth 1 https://github.com/datarobot-community/datarobot-agent-templates.git
cd datarobot-agent-templates 

datarobot-agent-templates version

この基本ステップでは、バージョン11.3.5のdatarobot-agent-templatesリポジトリを使用します。 Ensure that the workspace used for this walkthrough is on that version. Newer versions may not be compatible with the code provided below.

GitHubクローンのURL

GitHubリポジトリのクローン作成の詳細については、GitHubのドキュメントを参照してください。

環境の設定

datarobot-agent-templatesリポジトリを含む新しいディレクトリで、以下のコマンドを使用して、提供されたテンプレート環境ファイル(.env.template)をコピーし、名前を変更します。 このファイルで、必要な環境変数を定義します。

cp .env.template .env 

In the new .env file, enter your DATAROBOT_API_TOKEN and DATAROBOT_ENDPOINT. Then, enter any string to define a PULUMI_CONFIG_PASSPHRASE.

DataRobot credentials in codespaces

DataRobotのCodespaceを使用している場合は、ファイルからDATAROBOT_API_TOKENおよびDATAROBOT_ENDPOINT環境変数を削除します。これらはCodespace環境にすでに存在しているためです。

Next, add the following two environment variables to define the deployment IDs copied from the deployed Search Data Registry and Get Data Registry Dataset global agentic tools.

.env
# Data Registry tool deployment IDs
DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID=<YOUR_SEARCH_TOOL_DEPLOYMENT_ID>
DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID=<YOUR_READ_TOOL_DEPLOYMENT_ID> 

これらの値は、.envファイルからキーと値のペアを読み込むために、dotenvを使ってagent.pyファイルにロードされます。

CrewAI実行のクイックスタート

次に、task startを使ってquickstart.pyを実行し、CrewAIのテンプレートを選択します。

task start 

CrewAIのテンプレートを選択するには、1を押し、Enterを押して選択を確定します。

Example start command
task start
task: [start] uv run quickstart.py
*****           *          ****        *             *  
*    *  ***   *****  ***   *   *  ***  ****   ***  *****
*    * *   *    *   *   *  ****  *   * *  *  *   *   *  
*****   *** *   *    *** * *   *  ***  ****   ***    *  

--------------------------------------------------------
           Quickstart for DataRobot AI Agents           
--------------------------------------------------------
Checking environment setup for required pre-requisites...
All pre-requisites are installed.


You will now select an agentic framework to use for this project.
For more information on the different agentic frameworks please go to:
  https://github.com/datarobot-community/datarobot-agent-templates/blob/main/docs/getting-started.md

Please select an agentic framework to use:
1. agent_crewai
2. agent_generic_base
3. agent_langgraph
4. agent_llamaindex
5. agent_nat
Enter your choice (1-5): 1 

次に、YEnterの順に押して、前提条件を導入し、選択したエージェントの環境を設定します。

Example install prompt
Would you like to setup the uv python environments and install pre-requisites now?
(y/n): y 

Running these commands configures the environment for the agent_crewai template, removes all unnecessary files, and prepares the virtualenv to install the additional libraries required to run the selected agent template.

インストール済みの環境は、いつでも以下のコマンドを実行することで更新できます。

task install 

必要に応じて、エージェントテンプレートをカスタマイズする前に、変更なしでエージェントを実行します。 コードをテストするには、以下のコマンドを使用します。

task agent:cli START_DEV=1 -- execute --user_prompt 'Hi, how are you?' 

エージェントが要求した場合、構造化クエリーをプロンプトとして送信することもできます。

task agent:cli START_DEV=1 -- execute --user_prompt '{"topic":"Generative AI"}' 

これで、agent_crewai/custom_modelディレクトリでエージェントのコードをカスタマイズできます。 この基本ステップでは、環境は変更しません。

すべてのタスクコマンドを表示する

task startを実行する前に、プロジェクトで使用可能なタスクを表示するには、以下のようにtaskコマンドを実行します。

 task
task: Available tasks for this project:
* default:       ℹ️ Show all available tasks (run `task --list-all` to see hidden tasks)
* install:       Install dependencies for all agent components and infra      (aliases: req, install-all)
* start:         ‼️ Quickstart for DataRobot Agent Templates ‼️ 

task startを実行してフレームワークを選択した後に、プロジェクトで使用可能なタスクを表示するには、以下のようにtaskコマンドを実行します。

 task
task: Available tasks for this project:
* default:                           ℹ️ Show all available tasks (run `task --list-all` to see hidden tasks)
* install:                           🛠️ Install all dependencies for agent and infra
* agent:install:                     🛠️ [agent_crewai] Install agent uv dependencies      (aliases: agent:req)
* agent:add-dependency:              🛠️ [agent_crewai] Add provided packages as a new dependency to an agent
* agent:cli:                         🖥️ [agent_crewai] Run the CLI with provided arguments
* agent:dev:                         🔨 [agent_crewai] Run the development server
* agent:dev-stop:                    🛑 [agent_crewai] Stop the development server
* agent:chainlit:                    🛝 Run the Chainlit playground
* agent:create-docker-context:       🐳 [agent_crewai] Create the template for a local docker_context image
* agent:build-docker-context:        🐳 [agent_crewai] Build the Docker image
* infra:install:                     🛠️ [infra] Install infra uv dependencies
* infra:build:                       🔵 Deploy only playground testing resources with pulumi
* infra:deploy:                      🟢 Deploy all resources with pulumi
* infra:refresh:                     ⚪️ Refresh and sync local pulumi state
* infra:destroy:                     🔴 Teardown all deployed resources with pulumi 
 task --list-all
task: Available tasks for this project:
* build:                             
* default:                           ℹ️ Show all available tasks (run `task --list-all` to see hidden tasks)
* deploy:                            
* destroy:                           
* install:                           🛠️ Install all dependencies for agent and infra
* agent:add-dependency:              🛠️ [agent_crewai] Add provided packages as a new dependency to an agent
* agent:build-docker-context:        🐳 [agent_crewai] Build the Docker image
* agent:chainlit:                    🛝 Run the Chainlit playground
* agent:cli:                         🖥️ [agent_crewai] Run the CLI with provided arguments
* agent:create-docker-context:       🐳 [agent_crewai] Create the template for a local docker_context image
* agent:dev:                         🔨 [agent_crewai] Run the development server
* agent:dev-stop:                    🛑 [agent_crewai] Stop the development server
* agent:install:                     🛠️ [agent_crewai] Install agent uv dependencies      (aliases: agent:req)
* agent:lint:                        
* agent:lint-check:                  
* agent:test:                        
* agent:test-coverage:               
* agent:update:                      
* infra:build:                       🔵 Deploy only playground testing resources with pulumi
* infra:deploy:                      🟢 Deploy all resources with pulumi
* infra:destroy:                     🔴 Teardown all deployed resources with pulumi
* infra:info:                        
* infra:init:                        
* infra:install:                     🛠️ [infra] Install infra uv dependencies
* infra:install-pulumi-plugin:       
* infra:lint:                        
* infra:lint-check:                  
* infra:pulumi:                      
* infra:refresh:                     ⚪️ Refresh and sync local pulumi state
* infra:select:                      
* infra:select-env-stack:            
* infra:test:                        
* infra:test-coverage: 

また、使用可能なすべてのエージェントCLIコマンドを表示するには、task agent:cliを実行します。

CrewAIテンプレートファイルのカスタマイズ

デフォルトのCrewAIテンプレートをカスタマイズして、エージェントワークフローData Registry Search and Summarizeを作成するには、agent_crewai/custom_modelディレクトリを開き、カスタムモデルのアーティファクトに以下の変更を加えます。

  • agent.pyファイルを修正する。
  • tool_deployment.pyファイルを作成する。
  • tool_data_registry_search.pyファイルを作成する。
  • tool_data_registry_read.pyファイルを作成する。
  • model-metadata.yamlファイルを修正する。

agent.pyファイルの修正

CrewAIテンプレートのagent.pyファイルの内容を以下のコードに置き換えます。 これにより、以前の計画、書き込み、および編集のエージェントとタスクが、データレジストリからデータセットを検索および読み取るためのツールに置き換えられ、その後、これらのツールを使用して検索、読み取り、および編集のワークフローを実行するための新しいエージェントとタスクが追加されます。

この基本ステップでのコードのコピー

この基本ステップで、既存のテンプレートを修正するには、大きなコードブロックをコピーする必要があります。 コードスニペットの全内容をコピーするには、スニペットの右上隅にある クリップボードにコピーをクリックします。

修正されたファイル:agent.py
agent.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import json
import os
from dotenv import load_dotenv
from typing import Any, Generator, Optional, Union
from urllib.parse import urljoin, urlparse

from datarobot_drum import RuntimeParameters

from crewai import LLM, Agent, Crew, Task
from crewai.tools import BaseTool
from crewai_event_listener import CrewAIEventListener
from openai.types.chat import CompletionCreateParams
from ragas import MultiTurnSample
from ragas.messages import AIMessage, HumanMessage, ToolMessage

from datarobot_genai.core.chat.client import ToolClient
from tool_data_registry_search import SearchDataRegistryTool
from tool_data_registry_read import ReadDataRegistryTool

load_dotenv()

class MyAgent:
    """MyAgent is a custom agent that uses CrewAI to search, read, and summarize data from the Data Registry."""

    def __init__(
        self,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        model: Optional[str] = None,
        verbose: Optional[Union[bool, str]] = True,
        timeout: Optional[int] = 90,
        **kwargs: Any,
    ):
        """Initializes the MyAgent class with API key, base URL, model, and verbosity settings.

        Args:
            api_key: Optional[str]: API key for authentication with DataRobot services.
                Defaults to None, in which case it will use the DATAROBOT_API_TOKEN environment variable.
            api_base: Optional[str]: Base URL for the DataRobot API.
                Defaults to None, in which case it will use the DATAROBOT_ENDPOINT environment variable.
            model: Optional[str]: The LLM model to use.
                Defaults to None.
            verbose: Optional[Union[bool, str]]: Whether to enable verbose logging.
                Accepts boolean or string values ("true"/"false"). Defaults to True.
            timeout: Optional[int]: How long to wait for the agent to respond.
                Defaults to 90 seconds.
            **kwargs: Any: Additional keyword arguments passed to the agent.
                Contains any parameters received in the CompletionCreateParams.

        Returns:
            None
        """
        self.api_key = api_key or os.environ.get("DATAROBOT_API_TOKEN")
        self.api_base = (
            api_base
            or os.environ.get("DATAROBOT_ENDPOINT")
            or "https://api.datarobot.com"
        )
        self.model = model
        self.timeout = timeout
        if isinstance(verbose, str):
            self.verbose = verbose.lower() == "true"
        elif isinstance(verbose, bool):
            self.verbose = verbose
        self.event_listener = CrewAIEventListener()

@property
def llm(self) -> LLM:
    """Returns a CrewAI LLM instance configured to use DataRobot's LLM Gateway or a specific deployment.

    For help configuring different LLM backends see:
    https://github.com/datarobot-community/datarobot-agent-templates/blob/main/docs/developing-agents-llm-providers.md
    """
    api_base = urlparse(self.api_base)
    if os.environ.get("LLM_DEPLOYMENT_ID"):
        path = api_base.path
        if "/api/v2/deployments" not in path and "api/v2/genai" not in path:
            # Ensure the API base ends with /api/v2/ for deployments
            if not path.endswith("/"):
                path += "/"
            if not path.endswith("api/v2/"):
                path = urljoin(path, "api/v2/")
        api_base = api_base._replace(path=path)
        api_base_str = api_base.geturl()
        deployment_id = os.environ.get("LLM_DEPLOYMENT_ID")
        return LLM(
            model="openai/gpt-4o-mini",
            api_base=f"{api_base_str}deployments/{deployment_id}/",
            api_key=self.api_key,
            timeout=self.timeout,
        )
    else:
        # Use LLM Gateway
        api_base_str = api_base.geturl()
        if api_base_str.endswith("api/v2/"):
            api_base_str = api_base_str[:-7]  # Remove 'api/v2/'
        elif api_base_str.endswith("api/v2"):
            api_base_str = api_base_str[:-6]  # Remove 'api/v2'
        return LLM(
            model="datarobot/azure/gpt-4o-mini",
            api_base=api_base_str,
            api_key=self.api_key,
            timeout=self.timeout,
        )

@property
def tools_client(self) -> ToolClient:
    return ToolClient(
        api_key=self.api_key,
        base_url=self.api_base,
    )

## Data Registry tools
@property
def tool_data_registry_search(self) -> BaseTool:
    deployment_id = os.environ.get("DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID")
    if not deployment_id:
        deployment_id = RuntimeParameters.get("DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID")

    print(
        f"Initializing Data Registry Search Tool with deployment ID: {deployment_id}"
    )
    return SearchDataRegistryTool(
        tool_client=self.tools_client,
        deployment_id=deployment_id
    )

@property
def tool_data_registry_read(self) -> BaseTool:
    deployment_id = os.environ.get("DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID")
    if not deployment_id:
        deployment_id = RuntimeParameters.get("DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID")

    print(
        f"Initializing Data Registry Read Tool with deployment ID: {deployment_id}"
    )
    return ReadDataRegistryTool(
        tool_client=self.tools_client,
        deployment_id=deployment_id,
    )

## Data Registry agents and tasks
@property
def agent_data_registry_searcher(self) -> Agent:
    return Agent(
        role="Expert Data Registry Searcher",
        goal="Search for and retrieve relevant files from Data Registry.",
        backstory="You are a meticulous analyst that is skilled at examining lists of files and "
        "determining the most appropriate file based on the context.",
        verbose=self.verbose,
        allow_delegation=False,
        llm=self.llm,
    )

@property
def task_data_registry_search(self) -> Task:
    return Task(
        description=(
            "You should search for a relevant dataset ID in the Data Registry based on the provided dataset topic: "
            "{dataset_topic}."
        ),
        expected_output=(
            "Search for a list of relevant files in the Data Registry and determine the most "
            "relevant dataset ID that matches the given topic. You should return the entire dataset ID."
        ),
        agent=self.agent_data_registry_searcher,
        tools=[self.tool_data_registry_search],
    )

@property
def agent_data_registry_reader(self) -> Agent:
    return Agent(
        role="Expert Data Registry Data Reader and Analyst",
        goal=(
            "Read the data from a file in the Data Registry and summarize the csv data to answer a question."
            "The file is likely relatively large, so you should read it in chunks if necessary to answer the question."
        ),
        backstory="When provided with a dataset name and a dataset ID, you are an expert at using your available"
        "tools to read the data. You always return your responses in a csv style format."
        "You are an expert at analyzing and understanding csv data, especially when comma separated "
        "data is presented to you as a string. When provided with a dataset ID you always use your tools "
        "to read the data, especially data from the Data Registry as you have tools available to you that "
        "can read the data directly from the Data Registry. You take care to not read too much data at once, "
        "and read it in chunks, especially when you don't know the data structure and size in advance. "
        "If necessary, you call available tools many times to read the data in chunks, analyzing the data as you go. "
        "You can quickly analyze the data and provide informative insights to help users understand "
        "the data better. You can also answer questions about the data in a concise manner.",
        verbose=self.verbose,
        allow_delegation=False,
        llm=self.llm,
    )

@property
def task_data_registry_read(self) -> Task:
    return Task(
        description=(
            "Given a dataset ID you should use the dataset ID to read the data from the Data Registry. You then use this "
            "data to answer the question: {question}. "
            "It is critically important that you answer this question. The accuracy of your answer is "
            "paramount. You should always provide a comprehensive and verbose answer."
        ),
        expected_output=(
            "You should return a comprehensive and verbose answer to the question."
        ),
        agent=self.agent_data_registry_reader,
        tools=[self.tool_data_registry_read],
    )

@property
def agent_response_editor(self) -> Agent:
    return Agent(
        role="Expert Editor",
        goal="Summarize a verbose response provided by an Expert Data Reader and Analyst into a concise "
        "and clear answer.",
        backstory="You are an expert editor with a keen eye for detail. When provided with a verbose response from "
        "an Expert Data Reader and Analyst, your job is to summarize the response into a concise "
        "and clear answer. You should focus on clarity and brevity, ensuring that the final output "
        "is easy to understand and directly answers the question posed. You are skilled at distilling "
        "complex information into simple, digestible formats. You always ensure that the final output "
        "is accurate and reflects the key points from the original response.",
        verbose=self.verbose,
        llm=self.llm,
    )

@property
def task_response_edit(self) -> Task:
    return Task(
        description=(
            "You are given a verbose summary that is supposed to answer the question: {question}. You must ensure "
            "that the final output is concise and directly answers the question. It is critically important that "
            "you summarize the verbose response into a clear and concise answer."
        ),
        expected_output="You should return a concise summary answer to the question.",
        agent=self.agent_response_editor,
    )

## Data Registry crew
def crew(self) -> Crew:
    return Crew(
        agents=[self.agent_data_registry_searcher, self.agent_data_registry_reader, self.agent_response_editor],
        tasks=[self.task_data_registry_search, self.task_data_registry_read, self.task_response_edit],
        verbose=self.verbose,
    )

async def invoke(
    self, completion_create_params: CompletionCreateParams
) -> Union[
    Generator[tuple[str, Any | None, dict[str, int]], None, None],
    tuple[str, Any | None, dict[str, int]],
]:
    """Invoke the agent with the provided completion parameters.

    [THIS METHOD IS REQUIRED FOR THE AGENT TO WORK WITH DRUM SERVER]

    Args:
        completion_create_params: The completion request parameters including input topic and settings.
    Returns:
        Union[
            Generator[tuple[str, Any | None, dict[str, int]], None, None],
            tuple[str, Any | None, dict[str, int]],
        ]: For streaming requests, returns a generator yielding tuples of (response_text, pipeline_interactions, usage_metrics).
           For non-streaming requests, returns a single tuple of (response_text, pipeline_interactions, usage_metrics).

    """
    # Retrieve the starting user prompt from the CompletionCreateParams
    user_messages = [
        msg
        for msg in completion_create_params["messages"]
        if msg.get("role") == "user"
    ]
    user_prompt: Any = user_messages[0] if user_messages else {}
    user_prompt_content = user_prompt.get("content", "")

    # Handle both string and JSON inputs
    if isinstance(user_prompt_content, str):
        try:
            inputs = json.loads(user_prompt_content)
            if isinstance(inputs, dict) and "dataset_topic" not in inputs:
                # If it's a dict but doesn't have our expected keys, use the first value
                if inputs:
                    first_key, first_value = next(iter(inputs.items()))
                    inputs = {
                        "dataset_topic": first_value,
                        "question": first_value
                    }
                else:
                    # Fallback: use user_prompt_content or empty string
                    inputs = {
                        "dataset_topic": user_prompt_content if user_prompt_content else "",
                        "question": user_prompt_content if user_prompt_content else ""
                    }
            elif not isinstance(inputs, dict):
                inputs = {
                    "dataset_topic": inputs,
                    "question": inputs
                }
        except json.JSONDecodeError:
            inputs = {
                "dataset_topic": user_prompt_content,
                "question": user_prompt_content
            }
    else:
        inputs = {
            "dataset_topic": str(user_prompt_content),
            "question": str(user_prompt_content)
        }

    print("Running agent with inputs:", inputs)

    # Run the crew with the inputs
    crew_output = self.crew().kickoff(inputs=inputs)

    # Extract the response text from the crew output
    response_text = str(crew_output.raw)

    # Create a list of events from the event listener
    events = self.event_listener.messages
    if len(events) > 0:
        last_message = events[-1].content
        if last_message != response_text:
            events.append(AIMessage(content=response_text))
    else:
        events = None
    # Create pipeline interactions from events
    pipeline_interactions = self.create_pipeline_interactions_from_events(events) if events else None

    # Create usage metrics from crew output
    usage_metrics = {
        "completion_tokens": crew_output.token_usage.completion_tokens,
        "prompt_tokens": crew_output.token_usage.prompt_tokens,
        "total_tokens": crew_output.token_usage.total_tokens,
    }

    return response_text, pipeline_interactions, usage_metrics

@staticmethod
def create_pipeline_interactions_from_events(
    events: list[Union[HumanMessage, AIMessage, ToolMessage]],
) -> MultiTurnSample | None:
    """Convert a list of events into a MultiTurnSample.

    Creates the pipeline interactions for moderations and evaluation
    (e.g. Task Adherence, Agent Goal Accuracy, Tool Call Accuracy)
    """
    if not events:
        return None
    return MultiTurnSample(user_input=events) 

tool_deployment.pyの作成

エージェントワークフローにデプロイ済みのツールを実装するには、agent_crewai/custom_modelディレクトリにtool_deployment.pyファイルを作成します。 ファイルを作成したら、以下の内容を追加します。

修正されたファイル:tool_deployment.py
tool_deployment.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from abc import ABC
from crewai.tools import BaseTool
from datarobot_genai.core.chat.client import ToolClient

class BaseToolWithDeployment(BaseTool, ABC):
    model_config = {
        "arbitrary_types_allowed": True
    }
    """Adds support for arbitrary types in Pydantic models, needed for the ToolClient."""

    tool_client: ToolClient
    """The tool client initialized by the agent, which has access to its authorization context."""

    deployment_id: str
    """The DataRobot deployment ID of the custom model that will execute tool logic.""" 

tool_data_registry_search.pyファイルの作成

agent_crewai/custom_modelディレクトリにtool_data_registry_search.pyファイルを作成します。 更新されたagent.pyファイルは、import文from tool_data_registry_search import SearchDataRegistryToolを使用して、このファイルで定義されているSearchDataRegistryToolクラスをインポートします。 ファイルを作成したら、以下の内容を追加します。

新しいファイル:tool_data_registry_search.py
tool_data_registry_search.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
from typing import Dict, Type
from typing import List

from pydantic import BaseModel as PydanticBaseModel, Field

from tool_deployment import BaseToolWithDeployment

class SearchDataRegistryArgs(PydanticBaseModel):
    search_terms: str = Field(
        default="",
        description="Terms for the search. Leave blank to return all datasets."
    )
    limit: int = Field(
        default=2,
        description="The maximum number of datasets to return. Set to -1 to return all."
    )

class SearchDataRegistryTool(BaseToolWithDeployment):
    name: str = "Search Data Registry"
    description: str = (
        "This tool provides a list of all available dataset names and their associated IDs from the Data Registry. "
        "You should always check to see if the dataset you are looking for can be found here. "
        "For future queries, you should use the associated dataset ID instead of the name to avoid ambiguity."
    )
    args_schema: Type[PydanticBaseModel] = SearchDataRegistryArgs

    def _run(self, search_terms: str = "", limit: int = 2) -> List[Dict[str, str]]:

        request_payload = {"search_terms": search_terms, "limit": limit}

        result = self.tool_client.call(
            self.deployment_id,
            payload=request_payload,
        )

        return json.loads(result.data).get("datasets", []) 

tool_data_registry_read.pyファイルの作成

agent_crewai/custom_modelディレクトリにtool_data_registry_read.pyファイルを作成します。 更新されたagent.pyファイルは、import文from tool_data_registry_read import ReadDataRegistryToolを使用して、このファイルで定義されているReadDataRegistryToolクラスをインポートします。 ファイルを作成したら、以下の内容を追加します。

新しいファイル:tool_data_registry_read.py
tool_data_registry_read.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from io import BytesIO
from typing import Type, Union

import pandas as pd
from pydantic import BaseModel as PydanticBaseModel, Field
from tool_deployment import BaseToolWithDeployment

class ReadDataRegistryArgs(PydanticBaseModel):
    dataset_id: str = Field(
        ...,
        description="The ID of the dataset to read from the Data Registry."
    )
    offset: int = Field(
        default=0,
        description="The first row to start reading the dataset from."
    )
    limit: int | None = Field(
        default=20,
        description=(
            "The number of rows to read from the dataset. If not set, will return all rows."
        )
    )

class ReadDataRegistryTool(BaseToolWithDeployment):
    name: str = "Read Data Registry Dataset"
    description: str = (
        "This tool reads the contents of a Data Registry dataset when given the `dataset_id` input parameter. "
        "Use this tool whenever you need to read the contents of a dataset. "
        "Optionally, you can specify `offset` and `limit` to read a slice of the dataset rows."
    )
    args_schema: Type[PydanticBaseModel] = ReadDataRegistryArgs

    def _get_dataframe_from_response(
        self, data: Union[bytes, pd.DataFrame], headers: dict[str, str]
    ) -> pd.DataFrame:
        """Convert response data into a DataFrame.

        Args:
            data (Union[bytes, pd.DataFrame]): The response data, either as bytes or a DataFrame.
            headers (dict[str, str]): The response headers.

        Returns:
            pd.DataFrame: The parsed DataFrame.

        Raises:
            ValueError: If the response data is neither bytes nor a DataFrame.
        """
        if isinstance(data, bytes) and headers.get("Content-Type", "").lower().startswith("text/csv"):
            return pd.read_csv(BytesIO(data))
        if isinstance(data, pd.DataFrame):
            return data
        raise ValueError("The response data must be either bytes or a DataFrame.")

    def _run(self, dataset_id: str, offset: int=0, limit: int | None = None) -> str:
        if not dataset_id:
            raise ValueError("dataset_id is required but was not provided")

        rv = self.tool_client.call(self.deployment_id, payload={
            "dataset_id": dataset_id,
            "offset": offset,
            "limit": limit,
        }, )
        try:
            return self._get_dataframe_from_response(rv.data, rv.response_headers).to_csv()
        except Exception as e:
            raise ValueError(
                f"Could not read dataset with dataset_id '{dataset_id}'. "
                f"Please verify that the dataset_id exists and you have access to it. Error: {e}"
            ) 

model-metadata.yamlファイルの修正

While this step isn't required to use this agent locally or in a codespace, it's important to modify the existing model-metadata.yaml file for use in an agentic playground or in production.

model-metadata.yaml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
---
name: agent_crewai
type: inference
targetType: agenticworkflow
runtimeParameterDefinitions:
  - fieldName: LLM_DEPLOYMENT_ID
    defaultValue: <YOUR_LLM_DEPLOYMENT_ID>
    type: string
  - fieldName: DATA_REGISTRY_SEARCH_TOOL_DEPLOYMENT_ID
    defaultValue: <YOUR_SEARCH_TOOL_DEPLOYMENT_ID>
    type: string
  - fieldName: DATA_REGISTRY_READ_TOOL_DEPLOYMENT_ID
    defaultValue: <YOUR_READ_TOOL_DEPLOYMENT_ID>
    type: string 

修正されたエージェントワークフローのテスト

agent_crewai/custom_modelディレクトリ内のエージェントワークフローカスタムモデルに必要な変更と追加をすべて行ったら、以下のコマンドでワークフローをテストします。 --user_prompt引数を修正して、データレジストリに存在するデータセットをターゲットにします。

task agent:cli START_DEV=1 -- execute --user_prompt 'Describe a space dataset.' 

構造化クエリーをプロンプトとして送信することもできます。 この場合も、--user_prompt引数を修正して、データレジストリに存在するデータセットをターゲットにします。

task agent:cli START_DEV=1 -- execute --user_prompt '{"dataset_topic":"Space", "question": "Please describe the dataset and show a sample of a few rows"}' 

トラブルシューティング

エージェントワークフローのテスト中に問題が発生した場合は、エージェントの環境と依存関係を更新することをお勧めします。 これを行うには、task install(またはtask setup)、task agent:install、およびtask agent:updateコマンドを使用します。 これらのコマンドやその他のコマンドの詳細については、taskコマンドを使用してください。

テスト中に接続エラーが表示された場合は、コマンドでSTART_DEV=1を使用していることを確認するか、別のターミナルウィンドウでtask agent:devを使用して開発サーバーを個別に起動します。

次のステップ

データレジストリを検索し、データセットを要約するための更新されたエージェントワークフローのテストに問題がなければ、ワークフローをワークショップに送信してエージェントのプレイグラウンドに接続するか、ワークフローをデプロイします。