Create external vector databases with code¶
The following notebook outlines how you can build, validate, and register an external vector database to the DataRobot platform using DataRobot's Python client. This notebook is designed for use with DataRobot Notebooks; DataRobot recommends downloading this notebook and uploading it for use in the platform.
Setup¶
The following steps outline the necessary configuration to integrate external vector databases with the DataRobot platform.
This workflow uses the following feature flags. Contact your DataRobot representative or administrator for information on enabling these features.
- Enable Notebooks Filesystem Management
- Enable Proxy models
- Enable Public Network Access for all Custom Models
- Enable Monitoring Support for Generative Models
- Enable Custom Inference Models
Enable the notebook filesystem for this notebook in the notebook sidebar.
Set the notebook session timeout to 180 minutes.
Restart the notebook container using at least a "Medium" (16GB RAM) instance.
Optionally, upload your documents archive to the notebook filesystem.
Install libraries¶
Install the following libraries:
# Upgrade pip to fix langchain installation issues
!pip install --upgrade pip setuptools
!pip install "langchain" \
"langchain-community" \
"langchain-chroma" \
"sentence-transformers==3.0.0" \
"datarobotx" \
"unstructured" \
"pysqlite3-binary"
# replace sqlite3 with pysqlite3 to fix chroma issues
__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')
import datarobot as dr
import datarobotx as drx
from datarobot.models.genai.vector_database import CustomModelVectorDatabaseValidation
from datarobot.models.genai.vector_database import VectorDatabase
Connect to DataRobot¶
Read more about options for connecting to DataRobot from the Python client.
Download sample data¶
This example references a sample dataset made from the DataRobot english documentation. To experiment with your own data, modify this section and/or the "Load and split text" section to reference your local dataset.
Note: If you are a self-managed user, you must modify code samples that reference app.datarobot.com
to the appropriate URL for your instance.
import requests, zipfile, io
SOURCE_DOCUMENTS_ZIP_URL = "https://s3.amazonaws.com/datarobot_public_datasets/ai_accelerators/datarobot_english_documentation_5th_December.zip"
UNZIPPED_DOCS_DIR = "datarobot_english_documentation"
STORAGE_DIR = "storage"
r = requests.get(SOURCE_DOCUMENTS_ZIP_URL)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(f"{STORAGE_DIR}/")
Load and split text¶
Next, load the DataRobot documentation dataset and split it into chunks. If you are applying this recipe to a different use case, consider the following:
- Use additional or alternative document loaders.
- Filter out extraneous and noisy documents.
- Choose an appropriate
chunk_size
andoverlap
. These are counted by number of characters, not tokens.
import re
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
SOURCE_DOCUMENTS_DIR = f"{STORAGE_DIR}/{UNZIPPED_DOCS_DIR}/"
SOURCE_DOCUMENTS_FILTER = "**/*.txt"
loader = DirectoryLoader(f"{SOURCE_DOCUMENTS_DIR}", glob=SOURCE_DOCUMENTS_FILTER)
splitter = RecursiveCharacterTextSplitter(
chunk_size=128,
chunk_overlap=0,
)
print(f"Loading {SOURCE_DOCUMENTS_DIR} directory")
data = loader.load()
print(f"Splitting {len(data)} documents")
docs = splitter.split_documents(data)
for doc in docs:
doc.metadata['source'] = re.sub(
rf'{STORAGE_DIR}/{UNZIPPED_DOCS_DIR}/datarobot_docs/en/(.+)\.md',
r'https://docs.datarobot.com/en/docs/\1.html',
doc.metadata['source']
)
doc.metadata["category"] = doc.metadata["source"].split("|")[-1].replace(".txt", "")
print(f"Created {len(docs)} documents")
Create a vector database from documents (using ChromaDB with metadata filtering)¶
Use the following cell to build a vector database from the DataRobot documentation dataset. Note that this notebook uses ChromaDB, an open source, in-memory vector store with metadata filtering support that is compatible with the DataRobot Notebooks. Additionally, this notebook uses the HuggingFace jina-embedding-t-en-v1
embeddings model (open source).
from datetime import datetime
from langchain_chroma import Chroma
from langchain_community.embeddings.sentence_transformer import (SentenceTransformerEmbeddings)
CHROMADB_DATA_PATH = f"{STORAGE_DIR}/chromadb"
CHROMADB_EMBEDDING_CACHE_FOLDER = STORAGE_DIR + '/sentencetransformers'
CHROMADB_EMBEDDING_FUNCTION = SentenceTransformerEmbeddings(model_name="jinaai/jina-embedding-t-en-v1", cache_folder=CHROMADB_EMBEDDING_CACHE_FOLDER)
def create_chromadb_from_documents(docs, embedding_function, persist_directory):
start_time = datetime.now()
print(f'>>> BEGIN ({start_time.strftime("%H:%M:%S")}): Creating ChromaDB from documents')
print(f'Embedding function: {embedding_function}')
print(f'ChromaDB data directory: {persist_directory}')
print(' ')
print(f'Documents for loading: {len(docs)}')
db = Chroma.from_documents(docs, embedding_function, persist_directory=persist_directory)
end_time = datetime.now()
print(' ')
print(f'>>> END ({end_time.strftime("%H:%M:%S")}): Creating ChromaDB from documents')
print(f'Loaded {len(docs)} documents.')
print(f"Chroma VectorDB now has {db._collection.count()} documents")
total_elapsed_min = (end_time - start_time).total_seconds() / 60
document_average_sec = (end_time - start_time).total_seconds() / len(docs)
print("Total Elapsed", "%.2f" % total_elapsed_min, "minutes")
print("Document Average", "%.2f" % document_average_sec, "seconds")
return db
print(f"Created {len(docs)} documents")
db = create_chromadb_from_documents(docs, CHROMADB_EMBEDDING_FUNCTION, CHROMADB_DATA_PATH)
print(db._collection.count())
Test the vector database¶
Use the following cell to test the vector database by having the model perform a similarity search with metadata filtering; it will return the top five documents matching the query provided.
question = "What is MLOps?"
top_k = 5
metadata_filter = {"category": {"$eq": "index"}}
results_with_scores = db.similarity_search_with_score(
question,
k=top_k,
filter=metadata_filter,
)
print(len(results_with_scores))
for doc, score in results_with_scores:
print("********************************************************************************")
print(" ")
print("----------")
print(f"METADATA: {doc.metadata}, Score: {score}")
print(" ")
print("----------")
print(f"CONTENT: {doc.page_content}")
print(" ")
Define hooks for deploying an unstructured custom model¶
The following cell defines the methods used to deploy an unstructured custom model. These include loading the custom model and using the model for scoring.
import os
def load_model(input_dir):
"""Custom model hook for loading our knowledge base."""
import os
print("Loading model")
STORAGE_DIR = "storage"
CHROMADB_DATA_PATH = f"{STORAGE_DIR}/chromadb"
CHROMADB_EMBEDDING_CACHE_FOLDER = CHROMADB_DATA_PATH + '/sentencetransformers'
CHROMADB_EMBEDDING_MODEL_NAME = 'jinaai/jina-embedding-t-en-v1'
# https://docs.trychroma.com/troubleshooting#sqlite
__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')
from langchain_chroma import Chroma
from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings
print(f'CHROMADB_DATA_PATH = {CHROMADB_DATA_PATH}')
print(f'CHROMADB_EMBEDDING_CACHE_FOLDER = {CHROMADB_EMBEDDING_CACHE_FOLDER}')
print(f'CHROMADB_EMBEDDING_MODEL_NAME = {CHROMADB_EMBEDDING_MODEL_NAME}')
CHROMADB_EMBEDDING_FUNCTION = SentenceTransformerEmbeddings(model_name="jinaai/jina-embedding-t-en-v1", cache_folder=CHROMADB_EMBEDDING_CACHE_FOLDER)
db = Chroma(persist_directory=CHROMADB_DATA_PATH, embedding_function=CHROMADB_EMBEDDING_FUNCTION)
print(f'Loaded ChromaDB with {db._collection.count} chunks')
return db
def score_unstructured(model, data, query, **kwargs) -> str:
"""Custom model hook for retrieving relevant docs with our knowledge base.
When requesting predictions from the deployment, pass a dictionary
with the following keys:
- 'question' the question to be passed to the vector store retriever
- 'metadata' the metadata filter to be passed to the vector store retriever
- 'top_k' the number of results to return
datarobot-user-models (DRUM) handles loading the model and calling
this function with the appropriate parameters.
Returns:
--------
rv : str
Json dictionary with keys:
- 'question' user's original question
- 'relevant' the generated answer to the question
- 'metadata' - metadata for each document
- 'error' - error message if exception in handling request
"""
import json
try:
data_dict = json.loads(data)
question = data_dict['question']
top_k = data_dict.get("k", 10)
metadata_filter = data_dict.get("filter", None)
results_with_scores = model.similarity_search_with_score(
question,
k=top_k,
filter=metadata_filter,
)
print(f'Returned {len(results_with_scores)} results')
relevant, metadata = [], []
for doc, score in results_with_scores:
relevant.append(doc.page_content)
doc.metadata["similarity_score"] = score
metadata.append(doc.metadata)
rv = {
"question": question,
"relevant": relevant,
"metadata": metadata,
}
except Exception as e:
rv = {'error': f"{e.__class__.__name__}: {str(e)}"}
return json.dumps(rv), {"mimetype": "application/json", "charset": "utf8"}
Test hooks locally¶
Before proceeding with deployment, use the cell below to test that the custom model hooks function correctly.
import json
# Test the hooks locally
score_unstructured(
load_model(f"{STORAGE_DIR}"),
json.dumps(
{
"question": "How do I replace a custom model on an existing custom environment?",
"filter": {"category": {"$eq": "index"}},
"k": 1,
}
),
None,
)
Deploy the knowledge base¶
The cell below uses a convenience method that does the following:
- Builds a new custom model environment containing the contents of
storage/deploy/
. - Assembles a new custom model with the provided hooks.
- Deploys an unstructured custom model to DataRobot.
- Returns an object that can be used to make predictions.
This example uses a pre-built environment.
You can also provide an environment_id
and instead use an existing custom model environment for shorter iteration cycles on the custom model hooks.
See your account's existing pre-built environments from the DataRobot Custom Model Workshop.
dr.ExecutionEnvironment.list("Python 3.11")
[ExecutionEnvironment('[DataRobot][NVIDIA] Python 3.11 GenAI'), ExecutionEnvironment('[DataRobot] Python 3.11 GenAI'), ExecutionEnvironment('[GenAI] Python 3.11 with Moderations')]
deployment = drx.deploy(
model="storage/chromadb/",
name="External DR Knowledge Base",
hooks={
"score_unstructured": score_unstructured,
"load_model": load_model
},
extra_requirements=["langchain_chroma", "pysqlite3-binary"],
# Re-use existing environment if you want to change the hook code,
# and not requirements
environment_id=dr.ExecutionEnvironment.list("Python 3.11")[-1].id,
)
Test the deployment¶
Test that the deployment can successfully provide responses to questions.
#deployment = drx.Deployment("ADD_VALUE_HERE")
deployment.predict_unstructured(
{
"question": "How do I replace a custom model on an existing custom environment?",
"filter": {"category": {"$eq": "index"}},
"k": 1,
}
)
Validate and create the vector database from the external vector database¶
These methods execute, validate, and integrate the external vector database.
This example associates a Use Case with the validation and creates the vector database within that Use Case.
Set the use_case_id
to specify an existing Use Case or create a new one with that name.
use_case_id = "ADD_VALUE_HERE"
use_case = dr.UseCase.get(use_case_id)
# UNCOMMENT if you wish to create a new UseCase
# use_case = dr.UseCase.create()
The CustomModelVectorDatabaseValidation.create
function executes the validation of the vector database. Be sure to provide the deployment ID.
external_vdb_validation = CustomModelVectorDatabaseValidation.create(
prompt_column_name="question",
target_column_name="relevant",
deployment_id=deployment.dr_deployment.id,
use_case=use_case,
wait_for_completion=True
)
assert external_vdb_validation.validation_status == "PASSED"
After validation completes, use VectorDatabase.create_from_custom_model()
to integrate the vector database. You must provide the Use Case name (or Use Case ID), a name for the external vector database, and the validation ID returned from the previous cell.
vdb = VectorDatabase.create_from_custom_model(
name="DR External Vector Database",
use_case=use_case,
validation_id=external_vdb_validation.id
)
assert vdb.execution_status == "COMPLETED"
print(f"Vector Database ID: {vdb.id}")
This vector database ID can now be used in the GenAI E2E walkthrough to create the LLM blueprint with an external vector database.