Build and host a ChromaDB vector database¶
The following notebook provides an example of how you can build, validate, and register a vector database to the DataRobot platform using DataRobot's Python client. It describes how to load in and host a ChromaDB in-memory vector store, with metadata filtering, within a custom model. This notebook is designed for use with DataRobot Notebooks; DataRobot recommends downloading this notebook and uploading it for use in the platform.
Note that when using ChromaDB-hosted documents with custom models, maximum file size is 1GB per file.
Setup¶
The following steps outline the necessary configuration to integrate vector databases with the DataRobot platform.
This workflow uses the following feature flags. Contact your DataRobot representative or administrator for information on enabling these features.
- Enable Public Network Access for all Custom Models (Premium)
- Enable Monitoring Support for Generative Models
- Enable Custom Inference Models
- Enable GenAI Experimentation
Use a codespace, not a DataRobot Notebook, to ensure this notebook has access to a filesystem.
Set the notebook session timeout to 180 minutes.
Restart the notebook container and increase the resource type to the largest available.
Optionally, upload your documents archive to the notebook filesystem.
Install libraries¶
Install the following libraries:
!pip install "langchain-community==0.4.1" \
"langchain_text_splitters==1.0.0" \
"qdrant-client==1.16.1" \
"sentence-transformers==5.1.2" \
"datarobotx==0.2.0" \
"cloudpickle==2.2.1"
import datarobot as dr
import datarobotx as drx
from datarobot.models.genai.vector_database import CustomModelVectorDatabaseValidation
from datarobot.models.genai.vector_database import VectorDatabase
import os
from pathlib import Path
from qdrant_client import QdrantClient, models
from sentence_transformers import SentenceTransformer
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
import json
import requests
import zipfile
import io
import re
Connect to DataRobot¶
Read more about options for connecting to DataRobot from the Python client.
Download sample data¶
This example references a sample dataset made from the DataRobot english documentation. To experiment with your own data, modify this section and/or the "Load and split text" section to reference your local dataset.
Note: If you are a self-managed user, you must modify code samples that reference app.datarobot.com to the appropriate URL for your instance.
import requests, zipfile, io
SOURCE_DOCUMENTS_ZIP_URL = "https://s3.amazonaws.com/datarobot_public_datasets/ai_accelerators/datarobot_english_documentation_5th_December.zip"
UNZIPPED_DOCS_DIR = "datarobot_english_documentation"
STORAGE_DIR = "storage"
r = requests.get(SOURCE_DOCUMENTS_ZIP_URL)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(f"{STORAGE_DIR}/")
Load and split text¶
Next, load the DataRobot documentation dataset and split it into chunks. If you are applying this recipe to a different use case, consider the following:
- Use additional or alternative document loaders.
- Filter out extraneous and noisy documents.
- Choose an appropriate
chunk_sizeandoverlap. These are counted by number of characters, not tokens.
import re
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
SOURCE_DOCUMENTS_DIR = f"{STORAGE_DIR}/{UNZIPPED_DOCS_DIR}/"
SOURCE_DOCUMENTS_FILTER = "**/*.txt"
loader = DirectoryLoader(f"{SOURCE_DOCUMENTS_DIR}", glob=SOURCE_DOCUMENTS_FILTER)
splitter = RecursiveCharacterTextSplitter(
chunk_size=128,
chunk_overlap=0,
)
print(f"Loading {SOURCE_DOCUMENTS_DIR} directory")
data = loader.load()
print(f"Splitting {len(data)} documents")
docs = splitter.split_documents(data)
for doc in docs:
doc.metadata['source'] = re.sub(
rf'{STORAGE_DIR}/{UNZIPPED_DOCS_DIR}/datarobot_docs/en/(.+)\.md',
r'https://docs.datarobot.com/en/docs/\1.html',
doc.metadata['source']
)
doc.metadata["category"] = doc.metadata["source"].split("|")[-1].replace(".txt", "")
print(f"Created {len(docs)} documents")
Create a vector database from documents¶
Use the following cell to build a vector database from the DataRobot documentation dataset. Note that this notebook uses ChromaDB, an open source, in-memory vector store with metadata filtering support that is compatible with DataRobot Notebooks. Additionally, this notebook uses the HuggingFace jina-embedding-t-en-v1 embeddings model (open source).
# Create vectorDB
def create_database():
"""Create and populate the Qdrant database"""
# Initialize encoder
encoder = SentenceTransformer(EMBEDDING_MODEL_NAME)
# Initialize Qdrant client
client = QdrantClient(path=QDRANT_DATA_PATH)
try:
# Create collection
print("Creating collection...")
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=models.VectorParams(
size=encoder.get_sentence_embedding_dimension(),
distance=models.Distance.COSINE,
),
)
# Load text files
print(f"Loading documents from {UNZIPPED_DOCS_DIR}/...")
docs = []
doc_path = Path(UNZIPPED_DOCS_DIR)
for file_path in doc_path.rglob("*.txt"):
loader = TextLoader(str(file_path))
loaded_docs = loader.load()
docs.extend(loaded_docs)
print(f"Loaded {len(docs)} documents")
# Split documents into chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
split_docs = splitter.split_documents(docs)
print(f"Split into {len(split_docs)} chunks")
# Process metadata
for doc in split_docs:
# Convert file path to documentation URL
doc.metadata['source'] = re.sub(
r'datarobot_english_documentation/datarobot_docs/en/(.+)\.(txt|md)',
r'https://docs.datarobot.com/en/docs/\1.html',
doc.metadata.get('source', '')
)
# Extract category from source
doc.metadata["category"] = doc.metadata["source"].split("/")[-1].split(".")[0]
# Batch encode all documents
print("Encoding all documents (this may take a few minutes)...")
all_contents = [doc.page_content for doc in split_docs]
all_vectors = encoder.encode(
all_contents,
show_progress_bar=False,
batch_size=32,
convert_to_numpy=True
)
# Create points
print("Creating point structures...")
points_to_upload = [
models.PointStruct(
id=idx,
vector=all_vectors[idx].tolist(),
payload={
"content": doc.page_content,
"source": doc.metadata.get("source", ""),
"category": doc.metadata.get("category", ""),
**doc.metadata
}
)
for idx, doc in enumerate(split_docs)
]
# Upload to Qdrant
print(f"Uploading {len(points_to_upload)} points to Qdrant...")
client.upload_points(
collection_name=COLLECTION_NAME,
points=points_to_upload
)
# Verify
collection_info = client.get_collection(collection_name=COLLECTION_NAME)
print(f"✓ Collection '{COLLECTION_NAME}' created with {collection_info.points_count} points")
print(f"✓ Data saved to: {QDRANT_DATA_PATH}/")
finally:
# ALWAYS close the client
client.close()
print("✓ Database client closed")
# Run the creation (only once)
create_database()
Test the vector database¶
Use the following cell to test the vector database by having the model perform a similarity search with metadata filtering; it will return the top five documents matching the query provided.
question = "What is MLOps?"
top_k = 5
metadata_filter = {"category": {"$eq": "index"}}
results_with_scores = db.similarity_search_with_score(
question,
k=top_k,
filter=metadata_filter,
)
print(len(results_with_scores))
for doc, score in results_with_scores:
print("********************************************************************************")
print(" ")
print("----------")
print(f"METADATA: {doc.metadata}, Score: {score}")
print(" ")
print("----------")
print(f"CONTENT: {doc.page_content}")
print(" ")
Define hooks for deploying an unstructured custom model¶
The following cell defines the methods used to deploy an unstructured custom model. These include loading the custom model and using the model for scoring.
def load_model(input_dir):
"""Custom model hook for loading our Qdrant knowledge base."""
import os
print("Loading model")
EMBEDDING_MODEL_NAME = 'all-MiniLM-L6-v2'
COLLECTION_NAME = "my_documents"
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer
# When deploying model="qdrant/", the files are at input_dir/qdrant/
# not directly at input_dir
if input_dir:
QDRANT_DATA_PATH = os.path.join(input_dir, "qdrant")
else:
QDRANT_DATA_PATH = "qdrant"
print(f'QDRANT_DATA_PATH = {QDRANT_DATA_PATH}')
print(f'EMBEDDING_MODEL_NAME = {EMBEDDING_MODEL_NAME}')
print(f'COLLECTION_NAME = {COLLECTION_NAME}')
# Initialize the embedding model (downloads if needed)
encoder = SentenceTransformer(EMBEDDING_MODEL_NAME)
# Initialize Qdrant client
client = QdrantClient(path=QDRANT_DATA_PATH)
# Get collection info to verify it loaded
collection_info = client.get_collection(collection_name=COLLECTION_NAME)
print(f'Loaded Qdrant collection "{COLLECTION_NAME}" with {collection_info.points_count} points')
return {
"client": client,
"encoder": encoder,
"collection_name": COLLECTION_NAME
}
def score_unstructured(model, data, **kwargs) -> str:
"""Custom model hook for retrieving relevant docs with our Qdrant knowledge base.
When requesting predictions from the deployment, pass a dictionary
with the following keys:
- 'question' the question to be passed to the vector store retriever
- 'filter' any metadata filter
- 'k' the number of results to return (default: 10)
datarobot-user-models (DRUM) handles loading the model and calling
this function with the appropriate parameters.
Returns:
--------
rv : str
Json dictionary with keys:
- 'question' user's original question
- 'relevant' the retrieved document contents
- 'metadata' - metadata for each document including similarity scores
- 'error' - error message if exception in handling request
"""
import json
from qdrant_client import models
try:
# Loading data
data_dict = json.loads(data)
question = data_dict['question']
metadata_filter = data_dict.get("filter", None)
top_k = data_dict.get("k", 10)
# Defining info
client = model["client"]
encoder = model["encoder"]
collection_name = model["collection_name"]
# Encode the question
query_vector = encoder.encode(question).tolist()
# Build query parameters
query_params = {
"collection_name": collection_name,
"query": query_vector,
"limit": top_k,
}
# Only add filter if it exists
if metadata_filter is not None:
query_params["query_filter"] = models.Filter(**metadata_filter)
# Perform the search
results = client.query_points(**query_params).points
print(f'Returned {len(results)} results')
relevant, metadata = [], []
for hit in results:
# Extract the content from payload
content = hit.payload.get('content', '')
relevant.append(content)
# Add similarity score to metadata
hit_metadata = dict(hit.payload)
hit_metadata["similarity_score"] = hit.score
metadata.append(hit_metadata)
rv = {
"question": question,
"relevant": relevant,
"metadata": metadata,
}
except Exception as e:
rv = {'error': f"{e.__class__.__name__}: {str(e)}"}
return json.dumps(rv), {"mimetype": "application/json", "charset": "utf8"}
Test hooks locally¶
Before proceeding with deployment, use the cell below to test that the custom model hooks function correctly.
# Testing to ensure they work locally before deploying
def test_hooks():
"""Test the DataRobot hooks locally"""
# Load the model - pass None so it uses "qdrant" as the path
model = load_model(None)
try:
# Test scoring
print("=" * 80)
print("TEST: Basic search")
print("=" * 80)
result = score_unstructured(
model,
json.dumps({
"question": "What is MLOps?",
"filter": models.Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(value="datarobot_docs|en|more-info|eli5")
)
]
).model_dump(), # converting to be json serializable (will be convert back before scoring)
"k": 3,
})
)
response = json.loads(result[0])
print(f"Question: {response['question']}")
print(f"Found {len(response['relevant'])} results:\n")
for idx, (content, meta) in enumerate(zip(response['relevant'], response['metadata'])):
print(f"{idx+1}. Score: {meta['similarity_score']:.4f}")
print(f" Source: {meta.get('source', 'N/A')}")
print(f" Category: {meta.get('category', 'N/A')}")
print(f" Content: {content[:150]}...")
print()
finally:
# ALWAYS close the client after testing
model["client"].close()
print("✓ Test client closed")
# Run the test
test_hooks()
Deploy the knowledge base¶
The cell below uses a convenience method that does the following:
- Builds a new custom model environment containing the contents of
storage/deploy/. - Assembles a new custom model with the provided hooks.
- Deploys an unstructured custom model to DataRobot.
- Returns an object that can be used to make predictions.
This example uses a pre-built environment.
You can also provide an environment_id and instead use an existing custom model environment for shorter iteration cycles on the custom model hooks. See your account's existing pre-built environments from the DataRobot Workshop.
# Get GenAI environment
genai_environment = dr.ExecutionEnvironment.list(search_for="[GenAI] Python 3.11")[0]
# Deploy ONLY the qdrant/ folder (not the parent directory)
deployment = drx.deploy(
model="qdrant/",
name="Qdrant Vector Database",
hooks={
"score_unstructured": score_unstructured,
"load_model": load_model
},
extra_requirements=[
"qdrant-client",
"sentence-transformers",
],
environment_id=genai_environment.id
)
print(f"✓ Deployment complete!")
print(f"Deployment ID: {deployment.dr_deployment.id}")
Test the deployment¶
Test that the deployment can successfully provide responses to questions. The datarobot-predict library can facilitate this.
from datarobot_predict.deployment import predict_unstructured
# Get a datarobot deployment
# Data to pass
data = {
"question": "What time series forecasting capabilities does DataRobot have?",
"k": 3,
}
# Prediction request
content, response_headers = predict_unstructured(
deployment=deployment.dr_deployment,
data=data,
)
# Check output
content
# Now with metadata filtering
data = {
"question": "How do I replace a custom model on an existing custom environment?",
"filter": models.Filter(
must=[
models.FieldCondition(
key="category",
match=models.MatchValue(value="datarobot_docs|en|modeling|special-workflows|cml|cml-custom-env")
)
]
).model_dump(),
"k": 5,
}
# Prediction request
content, response_headers = predict_unstructured(
deployment=deployment.dr_deployment,
data=data,
)
# Check output
content
Validate and create the vector database¶
These methods execute, validate, and integrate the vector database.
This example associates a Use Case with the validation and creates the vector database within that Use Case.
Set the use_case_id to specify an existing Use Case or create a new one with that name.
use_case_id = "ADD_VALUE_HERE"
use_case = dr.UseCase.get(use_case_id)
# UNCOMMENT if you want to create a new Use Case
# use_case = dr.UseCase.create()
The CustomModelVectorDatabaseValidation.create function executes the validation of the vector database. Be sure to provide the deployment ID.
external_vdb_validation = CustomModelVectorDatabaseValidation.create(
prompt_column_name="question",
target_column_name="relevant",
deployment_id=deployment.dr_deployment.id,
use_case=use_case,
wait_for_completion=True
)
assert external_vdb_validation.validation_status == "PASSED"
After validation completes, use VectorDatabase.create_from_custom_model() to integrate the vector database. You must provide the Use Case name (or Use Case ID), a name for the external vector database, and the validation ID returned from the previous cell.
vdb = VectorDatabase.create_from_custom_model(
name="DR Vector Database",
use_case=use_case,
validation_id=external_vdb_validation.id
)
assert vdb.execution_status == "COMPLETED"
print(f"Vector Database ID: {vdb.id}")
This vector database ID can now be used in the GenAI E2E how-to to create the LLM blueprint with a vector database.