With Snowflake, you can call out to external APIs from user-defined functions (UDFs). Using a Snowflake scoring pipeline allows you to take advantage of these external API functions—leveraging Snowflake streams and tasks to create a streaming micro-batch ingestion flow that incorporates a DataRobot-hosted model.
There are several requirements and considerations when exploring this approach:
Any API must be fronted by the trusted cloud native API service (in the case of AWS, the AWS API Gateway).
Max payload size for synchronous requests is 10MB for the API gateway and 6MB for Lambda (other cloud providers have different limitations).
When deciding how to score your models, consider these questions. How does the total infrastructure react when scoring 10 rows vs. 10,000 rows vs. 10 million rows? What kind of load is sent when a small 2-node cluster is vertically scaled to a large 8-node cluster or when it is scaled horizontally to 2 or 3 instances? What happens if a request times out or a resource is unavailable?
Alternatives for executing large batch scoring jobs on Snowflake simply and efficiently are described in the client-request and server-side) scoring examples. Generally speaking, this type of scoring is best done as part of ETL or ELT pipelines. Low-volume streaming ingest using internal Snowflake streaming is a suitable application for leveraging external functions with a UDF.
The following demonstrates an ETL pipeline using Snowpipe, Streams, and Tasks within Snowflake. The example scores records through a DataRobot-hosted model using Kaggle's Titanic dataset. It ingests data via a streaming pipeline with objects in an STG schema, scores it against the model, and then loads it to the PUBLIC schema presentation layer.
Tasks, Tables, and External Function UDF objects (to assemble a streaming scoring pipeline for data as it is ingested)
AWS:
Lambda, as a serverless compute service that acts as the intermediary between Snowflake and DataRobot (which is currently a requirement for using an external function).
API Gateway, to provide an endpoint to front the Lambda function.
IAM policies to grant roles and privileges to necessary components.
Incoming data, which is placed in an S3 object store bucket.
An SQS queue.
DataRobot:
The model was built and deployed on the AutoML platform and is available for scoring requests via the DataRobot Prediction API. In this case, the model is served on horizontally scalable DataRobot cluster member hardware, dedicated solely to serving these requests.
The following illustrates the Snowflake external API UDF architecture:
Although a native UDF in Snowflake is written in JavaScript, the external function is executed remotely and can be coded in any language the remote infrastructure supports. It is then coupled with an API integration in Snowflake to expose it as an external UDF. This integration sends the payload to be operated on to an API proxy service (an AWS API Gateway in this case). The Gateway then satisfies this request through the remote service behind it—a microservice backed by a container or by a Lambda piece of code.
* The ability to create new DataRobot Prime models has been removed from the application. This does not affect existing Prime models or deployments.
This section provides an example of treating the gateway as a proxy for a complete passthrough and sending the scoring request to a DataRobot-hosted prediction engine. Note that in this approach, scalability also includes horizontally scaling prediction engines on the DataRobot cluster.
See the articles mentioned above for additional Lambda-creation workflows to gain familiarity with the environment and process. Create a Lambda named proxy_titanic with a Python 3.7 runtime environment. Leverage an existing IAM role or create a new one with default execution permissions.
Connecting to the DataRobot cluster requires some sensitive information:
The load balancing hostname in front of the DataRobot Prediction Engine (DPE) cluster.
The user's API token.
The deployment for the model to be scored.
The DataRobot key (managed AI Platform users only).
These values can be stored in the Lambda Environment variables section.
Lambda layers let you build Lambda code on top of libraries and separate that code from the delivery package. You don't have to separate the libraries, although using layers simplifies the process of bringing in necessary packages and maintaining code. This example requires the requests and pandas libraries, which are not part of the base Amazon Linux image, and must be added via a layer (by creating a virtual environment). In this example, the environment used is an Amazon Linux EC2 box. Instructions to install Python 3 on Amazon Linux are here.
Navigate to the Lambda service > Layers > Create Layer tab. Provide a name and link to the file in S3; note that this will be the Object URL of the uploaded ZIP. It is recommended, but not necessary, to set compatible environments, which makes them more easily accessible in a dropdown menu when adding them to a Lambda. Select to save the layer and its Amazon Resource Name (ARN).
Navigate back to the Lambda and click Layers under the Lambda title; add a layer and provide the ARN from the previous step.
Navigate back to the Lambda code. The following Python code will:
Accept a payload from Snowflake.
Pass the payload to DataRobot's Prediction API for scoring.
Return a Snowflake-compatible response.
importosimportjson#from pandas.io.json import json_normalizeimportrequestsimportpandasaspdimportcsvdeflambda_handler(event,context):# set default status to OK, no DR API errorstatus_code=200dr_error=""# The return value will contain an array of arrays (one inner array per input row).array_of_rows_to_return=[]try:# obtain secure environment variables to reach out to DataRobot APIDR_DPE_HOST=os.environ['dr_dpe_host']DR_USER=os.environ['dr_user']DR_TOKEN=os.environ['dr_token']DR_DEPLOYMENT=os.environ['dr_deployment']DR_KEY=os.environ['dr_key']# retrieve body containing input rowsevent_body=event["body"]# retrieve payload from bodypayload=json.loads(event_body)# retrieve row data from payloadpayload_data=payload["data"]# map list of lists to expected inputscols=['row','NAME','SEX','PCLASS','FARE','CABIN','SIBSP','EMBARKED','PARCH','AGE']df=pd.DataFrame(payload_data,columns=cols)print("record count is: "+str(len(df.index)))# assemble and send scoring requestheaders={'Content-Type':'text/csv; charset=UTF-8','Accept':'text/csv','datarobot-key':DR_KEY}response=requests.post(DR_DPE_HOST+'/predApi/v1.0/deployments/%s/predictions'%(DR_DEPLOYMENT),auth=(DR_USER,DR_TOKEN),data=df.to_csv(),headers=headers)# bail if anything other than a successful response occurredifresponse.status_code!=200:dr_error=str(response.status_code)+" - "+str(response.content)print("dr_error: "+dr_error)raisearray_of_rows_to_return=[]row=0wrapper=csv.reader(response.text.strip().split('\n'))header=next(wrapper)idx=header.index('SURVIVED_1_PREDICTION')forrecordinwrapper:array_of_rows_to_return.append([row,record[idx]])row+=1# send data back in required snowflake formatjson_compatible_string_to_return=json.dumps({"data":array_of_rows_to_return})exceptExceptionaserr:# 400 implies some type of error.status_code=400# Tell caller what this function could not handle.json_compatible_string_to_return='failed'# if the API call failed, update the error message with what happenediflen(dr_error)>0:print("error")json_compatible_string_to_return='failed; DataRobot API call request error: '+dr_error# Return the return value and HTTP status code.return{'statusCode':status_code,'body':json_compatible_string_to_return}
Lambda code for this example is available in GitHub. You can configure a test event to make sure the Lambda acts as expected. A DataRobot payload can be represented for this model with a few JSON records in the following format:
Once this event is created, select it from the test dropdown and click Test. The test returns a 200-level success response with a JSON-encapsulated list of lists, containing the 0-based row number and the returned model value. In this case, that model value is a score towards the positive class of label 1 (e.g., Titanic passenger survivability from a binary classifier model).
You can set additional Lambda configuration under Basic Settings. Lambda serverless costs are based on RAM "used seconds" duration. The more RAM allowed, the more virtual CPU is allocated. This allows handling and manipulating larger-sized input loads and for processing inside the Lambda to occur more quickly. Note that this Lambda defers to DataRobot for the heavier work; it just needs to accommodate for the data movement. If a Lambda exits prematurely due to exceeding resources, these values may need to be edited. The timeout default is 3 seconds; if the response from DataRobot for the micro-batch of records the Lambda is responsible for takes longer to process than the default value, the Lambda does not detect activity and shuts down. DataRobot tested and recommends the following values: 256 MB and a 10-second timeout. Actual usage for each executed Lambda can be found in the associated CloudWatch logs, available under the Monitoring tab of the Lambda.
For a Snowflake-owned IAM user to be granted permission, you must create a role that the user can then assume within the AWS account. In the console, navigate to IAM > Roles > Create role. When asked to Select type of trusted entity, choose Another AWS account and fill in the Account ID box with the AWS Account ID for the currently logged-in account. This can be found in the ARN of other roles, from the My Account menu or from various other places. A Snowflake external ID for the account is applied later.
Proceed through the next screens and save this role as snowflake_external_function_role. Save the role as Amazon Resource Name (ARN).
Navigate to the API Gateway service console and click Create API. Choose to build a REST API and select the REST protocol. Select to Create a New API. Create a friendly, readable name and click Create API. On the next screen, choose Actions > Create Resource. Set the resource name and path to score.
Next, choose Actions> Create Method. In the dropdown menu under the endpoint, choose POST. Select the checkbox next to Use Lambda Proxy Integration, select the previously created Lambda, and save.
Lastly, choose Actions > Deploy API. You must create a stage, such as test, and then click Deploy once you complete the form.
Note
The Invoke URL field on the subsequent editor page will later be used in creating an integration with Snowflake.
Navigate back to the Resources of the created API (in the left menu above Stages). Click on POST under the endpoint to bring up the Method Execution. Click the Method Request, toggle the Authorization dropdown to AWS_IAM, and then click the checkmark to save. Navigate back to the Method Execution and note the ARN within the Method Request.
Navigate to Resource Policy in the left menu. Add a policy that is populated with the AWS account number and the name of the previously created IAM role above (described in the Snowflake documentation).
The API Integration object will map Snowflake to the AWS Account role. Provide the role ARN and set the allowed prefixes to include the Invoke URL from the stage referenced above (a privilege level of accountadmin is required to create an API Integration).
Configure the Snowflake-to-IAM role trust relationship¶
Navigate back to the AWS IAM service > Roles , and to the snowflake_external_function_role role.
At the bottom of the Summary page, choose the Trust relationships tab and click the Edit trust relationship button. This opens a policy document to edit. As per the Snowflake documentation, edit the Principal attribute AWS key by replacing the existing value with the API_AWS_IAM_USER_ARN from Snowflake. Next to the sts:AssumeRole action, there will be a Condition key with an empty value between curly braces. Inside the braces, paste the following, replacing the API_AWS_EXTERNAL_ID with the value from Snowflake:
You can now create the external function inside Snowflake. It will reference the trusted endpoint to invoke via the previously built API integration. Be sure to match the expected parameter value in the function definition to the function the Lambda is expecting.
Full received payloads in this case contained ~1860 records. Payloads were roughly 0.029 MB in size (perhaps Snowflake is limiting them to 0.03 MB).
Whether scoring from an extra small-, small-, or medium-sized compute warehouse on Snowflake, the Lambda concurrency CloudWatch metrics dashboard always showed a concurrent execution peak of 8. Overall, this represents a rather gentle load on scoring infrastructure.
Performance should be satisfactory whether the model is run in the Lambda itself or offset to a DataRobot Prediction Engine. Note that for larger batch jobs and maximum throughput, other methods are still more efficient with time and resources.
Testing against an r4.xlarge Dedicated Prediction Engine on DataRobot produced a rate of roughly 13,800 records for this particular dataset and model.
Snowflake determines payload size and concurrency based on a number of factors. A controllable payload ceiling can be specified with a MAX_BATCH_ROWS value during external function creation. Future options may allow greater control over payload size, concurrency, and scaling with warehouse upsizing.
There are multiple options to bring data into Snowflake using streaming. One option is to use Snowflake's native periodic data-loading capabilities with Snowpipe. By using Snowflake streams and tasks, you can handle new records upon arrival without an external driving ETL/ELT.
You must create tables to hold the newly arrived records loaded from Snowpipe and to hold the processed and scored records for reporting. In this example, a raw passengers table is created in an STG schema and a scored passengers table is presented in the PUBLIC schema.
Snowflake provides options for loading new data as it arrives. This example applies option 1 (described in the Snowflake documentation) to use a Snowflake SQS queue directly. Note that step 4 to create new file event notifications is required.
To enable Snowpipe, navigate to the S3 bucket, click the Properties tab > Events tile, and then click Add notification. Create a notification to add a message to the specified SQS queue retrieved from the Snowflake pipe for every new file arrival.
You can create two types of stream objects in Snowflake—standard and append-only. Standard stream objects capture any type of change to a table; append-only stream objects capture inserted rows. Use the former for general Change Data Capture (CDC) processing. Use the latter (used in this example) for simple new row ingest processing.
In the append-only approach, think of the stream as a table that contains only records that are new since the last time any data was selected from it. Once a DML query that sources a stream is made, the rows returned are considered consumed and the stream becomes empty. In programming terms, this is similar to a queue.
A task is a step or series of cascading steps that can be constructed to perform an ELT operation. Tasks can be scheduled—similar to cron jobs— and set to run by days, times, or periodic intervals.
The following basic task scores the Titanic passengers through the UDF and loads the scored data to the presentation layer. It checks to see if new records exist in the stream every 5 minutes; if records are found, the task runs. The task is created in a suspended state; enable the task by resuming it. Note that many timing options are available for scheduling based on days, times, or periods.
The end-to-end pipeline is now complete. A PASSENGERS.csv file (available in GitHub) can run the pipeline, copying it into the watched bucket. The file prefix results in the data being ingested into a staging schema, scored through a DataRobot model, and then loaded into the presentation schema—all without any external ETL tooling.