セットアップ¶
以下の手順では、ベクターデータベースをDataRobotプラットフォームと連携させるために必要な設定について説明します。
このワークフローは以下の機能フラグを使用します。これらの機能を有効にするには、DataRobotの担当者または管理者にお問い合わせください。
- すべてのカスタムモデルでパブリックネットワークへのアクセスを有効にする(プレミアム)
- 生成モデルの監視サポートを有効にする
- Enable Custom Inference Models
- Enable GenAI Experimentation
DataRobot NotebooksではなくCodespaceを使用して、このノートブックがファイルシステムにアクセスできるようにします。
次のセクションで説明するパッケージがまだCodespaceの環境イメージにない場合は、
pip installを使用してインストールします。ノートブックのセッションタイムアウトを180分に設定します。
少なくとも"Medium"(16GB RAM)インスタンスを使用して、ノートブックコンテナを再起動します。
ライブラリのインストール¶
次のライブラリをインストールします。
!pip install "langchain-community==0.4.1" \
"langchain_text_splitters==1.0.0" \
"qdrant-client==1.16.1" \
"sentence-transformers==5.1.2" \
"datarobotx==0.2.0" \
"cloudpickle==2.2.1"
import datarobot as dr
import datarobotx as drx
from datarobot.models.genai.vector_database import CustomModelVectorDatabaseValidation
from datarobot.models.genai.vector_database import VectorDatabase
import os
from pathlib import Path
from qdrant_client import QdrantClient, models
from sentence_transformers import SentenceTransformer
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
import json
import requests
import zipfile
import io
import re
DataRobotに接続する¶
PythonクライアントからDataRobotに接続するためのオプションの詳細をご確認ください。
Download sample data¶
この例では、DataRobotの英語ドキュメントから作成されたサンプルデータセットを参照しています。こちらではご自身のデータをご自由にお使いください。
注:セルフマネージドAIプラットフォームでは、app.datarobot.comを参照するコードサンプルは、インスタンスに適したURLに変更する必要があります。
# Configuration
QDRANT_DATA_PATH = "qdrant"
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"
COLLECTION_NAME = "my_documents"
SOURCE_DOCUMENTS_ZIP_URL = "https://s3.amazonaws.com/datarobot_public_datasets/ai_accelerators/datarobot_english_documentation_5th_December.zip"
UNZIPPED_DOCS_DIR = "datarobot_english_documentation"
# Verify docs exist or download them
doc_path = Path(UNZIPPED_DOCS_DIR)
if doc_path.exists():
txt_files = list(doc_path.rglob("*.txt"))
print(f"✓ Found {len(txt_files)} .txt files in {UNZIPPED_DOCS_DIR}/")
if txt_files:
print(f" Sample files:")
for f in txt_files[:5]:
print(f" - {f}")
else:
# Download some example docs
r = requests.get(SOURCE_DOCUMENTS_ZIP_URL)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()
ドキュメントからベクターデータベースを作成する¶
以下のセルを使って、DataRobotのドキュメントデータセットからベクターデータベースを構築します。このノートブックはQdrantを使用しています。これは、メタデータのフィルター機能を備えたオープンソースのベクターデータベースです。さらに、このノートブックでは、HuggingFaceのall-MiniLM-L6-v2 embeddings model(こちらもオープンソース)を使用しています。
# Create vectorDB
def create_database():
"""Create and populate the Qdrant database"""
# Initialize encoder
encoder = SentenceTransformer(EMBEDDING_MODEL_NAME)
# Initialize Qdrant client
client = QdrantClient(path=QDRANT_DATA_PATH)
try:
# Create collection
print("Creating collection...")
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=models.VectorParams(
size=encoder.get_sentence_embedding_dimension(),
distance=models.Distance.COSINE,
),
)
# Load text files
print(f"Loading documents from {UNZIPPED_DOCS_DIR}/...")
docs = []
doc_path = Path(UNZIPPED_DOCS_DIR)
for file_path in doc_path.rglob("*.txt"):
loader = TextLoader(str(file_path))
loaded_docs = loader.load()
docs.extend(loaded_docs)
print(f"Loaded {len(docs)} documents")
# Split documents into chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
split_docs = splitter.split_documents(docs)
print(f"Split into {len(split_docs)} chunks")
# Process metadata
for doc in split_docs:
# Convert file path to documentation URL
doc.metadata['source'] = re.sub(
r'datarobot_english_documentation/datarobot_docs/en/(.+)\.(txt|md)',
r'https://docs.datarobot.com/en/docs/\1.html',
doc.metadata.get('source', '')
)
# Extract category from source
doc.metadata["category"] = doc.metadata["source"].split("/")[-1].split(".")[0]
# Batch encode all documents
print("Encoding all documents (this may take a few minutes)...")
all_contents = [doc.page_content for doc in split_docs]
all_vectors = encoder.encode(
all_contents,
show_progress_bar=False,
batch_size=32,
convert_to_numpy=True
)
# Create points
print("Creating point structures...")
points_to_upload = [
models.PointStruct(
id=idx,
vector=all_vectors[idx].tolist(),
payload={
"content": doc.page_content,
"source": doc.metadata.get("source", ""),
"category": doc.metadata.get("category", ""),
**doc.metadata
}
)
for idx, doc in enumerate(split_docs)
]
# Upload to Qdrant
print(f"Uploading {len(points_to_upload)} points to Qdrant...")
client.upload_points(
collection_name=COLLECTION_NAME,
points=points_to_upload
)
# Verify
collection_info = client.get_collection(collection_name=COLLECTION_NAME)
print(f"✓ Collection '{COLLECTION_NAME}' created with {collection_info.points_count} points")
print(f"✓ Data saved to: {QDRANT_DATA_PATH}/")
finally:
# ALWAYS close the client
client.close()
print("✓ Database client closed")
# Run the creation (only once)
create_database()
ベクターデータベースのテスト¶
以下のセルを使って、モデルに類似度検索を実行させ、ベクターデータベースをテストします。指定したクエリーに一致する上位5つのドキュメントが返されます。
# Test a query to the vectorDB
def test_database():
"""Test the database with a sample query"""
encoder = SentenceTransformer(EMBEDDING_MODEL_NAME)
client = QdrantClient(path=QDRANT_DATA_PATH)
try:
question = "What is MLOps?"
query_vector = encoder.encode(question).tolist()
results = client.query_points(
collection_name=COLLECTION_NAME,
query=query_vector,
limit=5,
).points
print(f"Found {len(results)} results for: '{question}'\n")
for hit in results:
print(f"Score: {hit.score:.4f}")
print(f"Content: {hit.payload.get('content', '')[:200]}...")
print(f"Source: {hit.payload.get('source', '')}")
print(f"Category: {hit.payload.get('category', '')}\n")
finally:
# ALWAYS close the client
client.close()
print("✓ Test client closed")
# Run the test
test_database()
非構造化カスタムモデルをデプロイするためのフックを定義する¶
以下のセルは、非構造化カスタムモデルのデプロイに使用されるメソッドを定義します。これらには、カスタムモデルのロードと、スコアリングのためのモデルの使用が含まれます。このノートブックでは、ベクターデータベースがDataRobotのインフラストラクチャに読み込まれます。あるいは、load_model内でプロキシエンドポイントを使用し、実際にベクターデータベースが保存されている場所を指すことも可能です。
def load_model(input_dir):
"""Custom model hook for loading our Qdrant knowledge base."""
import os
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer
print("Loading model")
EMBEDDING_MODEL_NAME = 'all-MiniLM-L6-v2'
COLLECTION_NAME = "my_documents"
# When deploying model="qdrant/", the files are at input_dir/qdrant/
# not directly at input_dir
if input_dir:
QDRANT_DATA_PATH = os.path.join(input_dir, "qdrant")
else:
QDRANT_DATA_PATH = "qdrant"
print(f'QDRANT_DATA_PATH = {QDRANT_DATA_PATH}')
print(f'EMBEDDING_MODEL_NAME = {EMBEDDING_MODEL_NAME}')
print(f'COLLECTION_NAME = {COLLECTION_NAME}')
# Initialize the embedding model (downloads if needed)
encoder = SentenceTransformer(EMBEDDING_MODEL_NAME)
# Initialize Qdrant client
client = QdrantClient(path=QDRANT_DATA_PATH)
# Get collection info to verify it loaded
collection_info = client.get_collection(collection_name=COLLECTION_NAME)
print(f'Loaded Qdrant collection "{COLLECTION_NAME}" with {collection_info.points_count} points')
return {
"client": client,
"encoder": encoder,
"collection_name": COLLECTION_NAME
}
def score_unstructured(model, data, **kwargs) -> str:
"""Custom model hook for retrieving relevant docs with our Qdrant knowledge base.
When requesting predictions from the deployment, pass a dictionary
with the following keys:
- 'question' the question to be passed to the vector store retriever
- 'filter' any metadata filter
- 'k' the number of results to return (default: 10)
datarobot-user-models (DRUM) handles loading the model and calling
this function with the appropriate parameters.
Returns:
--------
rv : str
Json dictionary with keys:
- 'question' user's original question
- 'relevant' the retrieved document contents
- 'metadata' - metadata for each document including similarity scores
- 'error' - error message if exception in handling request
"""
import json
from qdrant_client import models
try:
# Loading data
data_dict = json.loads(data)
question = data_dict['question']
metadata_filter = data_dict.get("filter", None)
top_k = data_dict.get("k", 10)
# Defining info
client = model["client"]
encoder = model["encoder"]
collection_name = model["collection_name"]
# Encode the question
query_vector = encoder.encode(question).tolist()
# Build query parameters
query_params = {
"collection_name": collection_name,
"query": query_vector,
"limit": top_k,
}
# Only add filter if it exists
if metadata_filter is not None:
query_params["query_filter"] = models.Filter(**metadata_filter)
# Perform the search
results = client.query_points(**query_params).points
print(f'Returned {len(results)} results')
relevant, metadata = [], []
for hit in results:
# Extract the content from payload
content = hit.payload.get('content', '')
relevant.append(content)
# Add similarity score to metadata
hit_metadata = dict(hit.payload)
hit_metadata["similarity_score"] = hit.score
metadata.append(hit_metadata)
rv = {
"question": question,
"relevant": relevant,
"metadata": metadata,
}
except Exception as e:
rv = {'error': f"{e.__class__.__name__}: {str(e)}"}
return json.dumps(rv), {"mimetype": "application/json", "charset": "utf8"}
ローカルでフックをテストする¶
デプロイに進む前に、以下のセルを使用して、カスタムモデルフックが正しく機能することを確認します。
# Testing to ensure they work locally before deploying
def test_hooks():
"""Test the DataRobot hooks locally"""
# Load the model - pass None so it uses "qdrant" as the path
model = load_model(None)
try:
# Test scoring
print("=" * 80)
print("TEST: Basic search")
print("=" * 80)
result = score_unstructured(
model,
json.dumps({
"question": "What is MLOps?",
"filter": models.Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(value="datarobot_docs|en|more-info|eli5")
)
]
).model_dump(), # converting to be json serializable (will be convert back before scoring)
"k": 3,
})
)
response = json.loads(result[0])
print(f"Question: {response['question']}")
print(f"Found {len(response['relevant'])} results:\n")
for idx, (content, meta) in enumerate(zip(response['relevant'], response['metadata'])):
print(f"{idx+1}. Score: {meta['similarity_score']:.4f}")
print(f" Source: {meta.get('source', 'N/A')}")
print(f" Category: {meta.get('category', 'N/A')}")
print(f" Content: {content[:150]}...")
print()
finally:
# ALWAYS close the client after testing
model["client"].close()
print("✓ Test client closed")
# Run the test
test_hooks()
ナレッジベースのデプロイ¶
以下のセルでは、次のことを行う便利なメソッドを使用しています。
qdrant/の内容を含む新しいカスタムモデル環境を構築する。- 提供されたフックで新しいカスタムモデルを構築する。
- 非構造化カスタムモデルをDataRobotにデプロイする。
- 予測に使用できるオブジェクトを返す。
この例では、事前構築済みの環境を使用します。
また、environment_idを指定し、代わりに既存のカスタムモデル環境を使用することで、カスタムモデルフックでの反復サイクルを短縮できます。DataRobotワークショップで、お使いのアカウントの既存の構築済み環境を確認してください。
# Get GenAI environment
genai_environment = dr.ExecutionEnvironment.list(search_for="[GenAI] Python 3.11")[0]
# Deploy ONLY the qdrant/ folder (not the parent directory)
deployment = drx.deploy(
model="qdrant/",
name="Qdrant Vector Database",
hooks={
"score_unstructured": score_unstructured,
"load_model": load_model
},
extra_requirements=[
"qdrant-client",
"sentence-transformers",
],
environment_id=genai_environment.id
)
print(f"✓ Deployment complete!")
print(f"Deployment ID: {deployment.dr_deployment.id}")
デプロイのテスト¶
datarobot-predictライブラリを使用して、デプロイが質問への回答を正常に提供できることをテストします。
# Get GenAI environment
genai_environment = dr.ExecutionEnvironment.list(search_for="[GenAI] Python 3.11")[0]
# Deploy ONLY the qdrant/ folder (not the parent directory)
deployment = drx.deploy(
model="qdrant/",
name="Qdrant Vector Database",
hooks={
"score_unstructured": score_unstructured,
"load_model": load_model
},
extra_requirements=[
"qdrant-client",
"sentence-transformers",
],
environment_id=genai_environment.id
)
print(f"✓ Deployment complete!")
print(f"Deployment ID: {deployment.dr_deployment.id}")
# Now with metadata filtering
data = {
"question": "How do I replace a custom model on an existing custom environment?",
"filter": models.Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(value="datarobot_docs|en|modeling|special-workflows|cml|cml-custom-env")
)
]
).model_dump(),
"k": 5,
}
# Prediction request
content, response_headers = predict_unstructured(
deployment=deployment.dr_deployment,
data=data,
)
# Check output
content
ベクターデータベースを検証して作成¶
これらのメソッドはベクターデータベースを実行、検証、統合します。
この例では、ユースケースと検証を関連付け、そのユースケース内にベクターデータベースを作成します。
use_case_idを設定して既存のユースケースを指定するか、その名前で新しいユースケースを作成します。
# Use current use case (assuming your working in a DataRobot Codespace)
use_case_id = os.environ['DATAROBOT_DEFAULT_USE_CASE']
use_case = dr.UseCase.get(use_case_id)
# UNCOMMENT if you want to create a new Use Case
# use_case = dr.UseCase.create()
CustomModelVectorDatabaseValidation.createは、ベクターデータベースの検証を実行します。デプロイIDを必ず入力します。
external_vdb_validation = CustomModelVectorDatabaseValidation.create(
prompt_column_name="question",
target_column_name="relevant",
deployment_id=deployment.dr_deployment.id,
use_case=use_case,
wait_for_completion=True
)
external_vdb_validation
assert external_vdb_validation.validation_status == "PASSED"
検証が完了したら、VectorDatabase.create_from_custom_model()を使用してベクターデータベースを連携します。ユースケース名(またはユースケースID)、外部ベクターデータベースの名前、および前のセルから返される検証IDを指定する必要があります。
vdb = VectorDatabase.create_from_custom_model(
name="Qdrant Vector Database",
use_case=use_case,
validation_id=external_vdb_validation.id
)
vdb
assert vdb.execution_status == "COMPLETED"
print(f"Vector Database ID: {vdb.id}")
このベクターデータベースIDは、LLMブループリントと組み合わせてRAGワークフローの作成に使用できるようになりました。