Migrate a model to a new cluster¶
This notebook demonstrates how to migrate a model from one DataRobot cluster to another of the same version.
Prerequisites¶
- API keys for both the source and destination clusters
- Enable the "Experimental API access" feature flag on the source cluster
- Ensure that you can connect this notebook to the source and destination clusters
- Maintain matching DataRobot version numbers (e.g., v8.0) for both clusters
The destination cluster must include the following in
config.yaml
:app_configuration: drenv_override: WHITELIST_EXPERIMENTAL_API: true EXPERIMENTAL_API_ACCESS: true
import sys
import requests
import argparse
import logging
import time
import json
import os
from datetime import date
import datetime
from timeit import default_timer
import datarobot as dr
import urllib.parse
print("Started: %s" % (str(datetime.datetime.now())))
Started: 2022-11-17 18:56:27.177892
Set parameters¶
When migrating from an on-premise cluster to app.datarobot.com
, you cannot connect directly without first activating the "Enable Experimental API access" feature flag.
The snippet below outlines the parameter settings for the source and destination clusters.
# Source cluster settings
# Provide the URL protocol, address (IP or FQDN), and path
# Example: source_host = "https://1.2.3.4/api/v2"
source_host = "<source-cluster-ip-address>"
# Provide an API key from an eligible user with permission on the source cluster
source_apikey = "NjM3NmJmYmYyMDAwMWJmNTVmMWZlN2FmOjFoRnE0bjQ4czByU3dQelVCU2o1RzNZZDM2YnBNbm4zcWF4YkIyN2RvK2c9"
# Provide the project ID on the source cluster
source_project_id = "6376c3d420001bf55f1fe94f"
# Provide the model ID on the source cluster
source_model_id = "6376c8d999ca930cb54bd4d3"
# Destination cluster settings
# Example: destination_host = "https://4.3.2.1/api/v2"
destination_host = "<destination-cluster-ip-address>"
# Provide an API key from an eligible user with the correct permissions on the destination cluster
destination_apikey = "NjM3NTRhNWQ2Zjg2YzI0MGJkNTQyMGE0OnNsSk94UU1lWnRhMXB0RnlMZUpqeWp2bG5xckM0ckRHNTNKTytNYnBzSjg9"
print("Source url: %s | project_id: %s | model_id: %s" % (source_host,source_project_id, source_model_id))
print("Destinastion url: %s" % (destination_host))
# print("Source url: %s | project_id: %s | model_id: %s" % (source_host,source_project_id, source_model_id))
# print("Destinastion url: %s" % (destination_host))
Download the model package¶
Use the following snippet to prepare for model migration by downloading the model package (.mlpkg) file to the specified directory.
# Download package settings
modeldir = "model/"
modelname = source_model_id+".mlpkg"
modelpath = modeldir + modelname
# Build the headers and provide a token
headers = {}
headers['Authorization'] = 'Bearer {}'.format(source_apikey)
# Create a new session
session = requests.Session()
session.headers.update(headers)
print("Downloading the mlpkg file from: %s" % source_host)
# Package download
# Makes a request to generate the .mlpkg for download on the target server
# Returns a URL in the location attribute of the response header or None (if errored)
def _request_model_package_download(session, host, pid, mid):
apiEndpoint = format("{}/projects/{}/models/{}/modelPackageFile/prepare/".format(host, pid, mid))
print("using download apiEndpoint: %s" % apiEndpoint)
try:
r = session.post(apiEndpoint)
r.raise_for_status()
return r.headers.get('Location')
except requests.exceptions.HTTPError as err:
print("Error: %s" % err)
return None
# Downloads the .mlpkg file locally from the target server
# Returns the binary data to be downloaded or None (if the download failed)
def get_model_package(session, host, pid, mid):
location = _request_model_package_download(session, host, pid, mid)
print("using location: %s" % location)
attempts = 0
wait_length = 10
while attempts <= 10:
try:
r = session.get(location)
r.raise_for_status()
print(r.json())
print("sleeping %s seconds" % wait_length)
time.sleep(wait_length)
attempts += 1
except ValueError:
print("looks like no json, time to download")
return r
except:
attempts += 1
print("exception, sleeping for 60 seconds")
time.sleep(60)
print("Number of check attempts exceeded. please check the target instance to see if the package is still being assembled or not")
return None
start = default_timer()
output = get_model_package(session, source_host, source_project_id, source_model_id)
if output is None:
print("download failed")
print("Saving data to: %s" % modelpath)
with open(modelpath,'wb') as f:
f.write(output.content)
print('%s took %s seconds to download %s megs' % (modelpath, default_timer() - start, str(round(os.path.getsize(modelpath) / (1024 * 1024), 2))))
Upload the model package¶
# Upload settings
modeldir = "model/"
modelname = source_model_id+".mlpkg"
modelpath = modeldir + modelname
headers = {}
headers['Authorization'] = 'Bearer {}'.format(destination_apikey)
session = requests.Session()
session.headers.update(headers)
model_name = ""
# Package upload
# Makes a request to upload the .mlpkg file to the target server
# Returns a URL in the location attribute of the response header or None (if errored)
def _request_package_upload(session, host, fileLocation):
apiEndpoint = format("{}/modelPackages/fromFile/".format(host))
print("using upload apiEndpoint: %s" % apiEndpoint)
f = {'file': open(fileLocation, 'rb')}
try:
r = session.post(apiEndpoint, files=f)
r.raise_for_status()
return r.headers.get('Location')
except requests.exceptions.HTTPError as err:
print("ERROR: %s" % err)
return None
# Uploads the .mlpkg file to the target server
# Returns the ID of the new model package or None (if the download failed)
def upload_model_package(session, host, fileLocation):
location = _request_package_upload(session, host, fileLocation)
print("Location: %s" % location)
attempts = 0
wait_length = 10
while attempts < 10:
try:
r = session.get(location)
r.raise_for_status()
data = r.json()
# Check if you get a status or if it's redirected to the package object
if data.get('status') is not None:
print(data)
else:
print("Model Package Uploaded")
return data.get('id'), data.get('importance'), data.get('name')
attempts += 1
print("sleeping %s seconds" % wait_length)
time.sleep(wait_length)
except:
attempts += 1
print("exception, sleeping 60")
time.sleep(60)
print("ERROR: Number of check attempts exceeded. please check the target instance to see if there are errors")
return None
# Upload the .mlpkg
start = default_timer()
print("Uploading file: %s to: %s" % (modelpath,destination_host) )
destination_model_id, destination_model_importance, destination_model_name = upload_model_package(session, destination_host, modelpath)
if destination_model_id is None:
print("upload failed")
else:
print("Upload of %s took %s seconds" % (destination_model_id, default_timer() - start))
Get the dedicated prediction engine ID¶
dpeEndpoint = format("{}/predictionServers/".format(destination_host))
prediction_environment_id = None
print("using deploy dpeEndpoint: %s" % dpeEndpoint)
try:
r = session.get(dpeEndpoint)
r.raise_for_status()
prediction_environment_id = r.headers.get('Location')
except requests.exceptions.HTTPError as err:
print("Error: %s" % err)
raise Exception("Error: %s" % err)
print("Found DPE Id: %s" % prediction_environment_id)
Create a deployment¶
Use the snippet below to deploy the model package.
# Returns Deployment ID or None
def deploy_model(session, pid, mid, imp):
apiEndpoint = format("{}/deployments/fromModelPackage/".format(destination_host))
print("using deploy apiEndpoint: %s" % apiEndpoint)
# "predictionEnvironmentId": prediction_environment_id if prediction_environment_id is not None
body_payload = {
"label": "%s" % (destination_model_name),
"description": "Cloned from: %s" % (urllib.parse.urlparse(destination_host).netloc),
"modelPackageId": mid,
"importance": imp
}
try:
r = session.post(
apiEndpoint,
data=json.dumps(body_payload),
headers ={'Content-Type': 'application/json', 'Accept': 'application/json'}
)
r.raise_for_status()
return r.text
except requests.exceptions.HTTPError as err:
print("ERROR: %s" % err)
print(r.text)
print(r.headers)
return None
start = default_timer()
if (destination_model_importance is None):
destination_model_importance = "LOW"
output = deploy_model(session, prediction_environment_id, destination_model_id, destination_model_importance )
print("Deplyment of: %s took: %s seconds" % ( output, default_timer() - start))