Skip to content

Click in-app to access the full platform documentation for your version of DataRobot.

Monitor with serverless MLOps agents

DataRobot can monitor model performance and drift statistics for models deployed on external systems. These externally deployed models can be:

  • Docker containers of DataRobot-generated models
  • DataRobot models exported as Java or Python Scoring Code
  • Custom-coded models or models created in another tool

This section shows how to scale DataRobot MLOps agents on AWS to handle small and large data queues using serverless resources to track deployed models.

High-level MLOps agent architecture

At a high level, an externally deployed ML model leverages the DataRobot MLOps library to produce reporting records and send them to DataRobot to process and report on. The DataRobot MLOps agent consumes these records and pipes them into a DataRobot instance. You can configure the agent to read from various spool sources, including flat files, AWS SQS, RabbitMQ, and so on (for the full list, see Prediction intake options). This topic describes how serverless MLOps agents can scale to consume an AWS SQS Queue, focusing solely on the queue data consumption and reporting to DataRobot.

Standard solution architecture

The External Model in this diagram represents a model of any of the types mentioned above, deployed outside of the DataRobot cluster. One approach is to run the MLOps agent on a standing compute environment. This involves standing up a server resource, installing the agent on the resource, and having it continually poll for new data to consume and report back to DataRobot. This is an adequate solution, but does present two drawbacks: cost and scaling.

The server is always up and running, which can be costly for even the smallest solutions. Scaling is an issue when the solution needs to consume a queue that has many elements, for example, many models or very busy models writing to the SQS queue. For example, what if the SQS queue has a million elements backlogged? How long until the queue is fully processed by a single agent? You can run multiple agents to consume and send back data in a concurrent fashion. EC2 auto-scaling does not solve this problem; the triggering mechanisms for scaling more machines relates to how busy the EC2 server itself is, rather than the actual quantity of items in its backlog (that it needs to process from an external queue).

Serverless solution architecture

A serverless architecture can leverage AWS Lambda to create an MLOps agent on demand, and scale additional agents via multiple Lambda functions to consume a populated queue based on its backlog.

The MLOps agent code is stored and retrieved from S3 and brought into a Lambda function to instantiate it. There it polls and consumes records written by the External Model into a Data SQS queue. The function includes logic to specify when to run additional MLOps agents (and how many). Inserting messages into the agent SQS queue "recruits" agents by triggering their creation.

Concurrency of Lambda functions is managed by keeping track of the functions in a running state via the DynamoDB noSQL database. Although the Lambda service has a concurrency reservation setting, it does not operate as a number of open slots where a Lambda function can run immediately once a slot is open.

Instead, it works by sleeping Lambda function invocations past the reservation limit; then, until concurrency slots open up, the lengths of sleep times increase. This is rarely the desired effect when processing data because it can lead to periods of idle wait time where nothing is being processed.

A CloudWatch schedule is set to insert a message into the Agent SQS queue every 15 minutes. This operation runs an initial Lambda function to see if the data queue itself is in a non-empty state. After running, it must be cleared.

A Lambda function has a maximum duration of 15 minutes. If the backlog still remains after that time, the MLOps agent gracefully terminates and passes another message to the Agent SQS queue to trigger another new Lambda function instance to take its place.

Add dependent items to S3

The Lambda function retrieves the agent installer and configuration files from S3, and creates or leverages an existing bucket for these items. The MLOps agent tarball and its configuration will are hosted in S3. These are both provided to the function via its environment config. (This topic refers to these objects as the agent archive and config, respectively.)

s3://my_bucket/community/agent/datarobot-mlops-agent-6.1.3-272.tar.gz
s3://my_bucket/community/agent/mlops.agent.conf.yaml

Create data Simple Queue Service (SQS) queue

This queue is written to by one to many externally deployed models. Scored data records and metadata are sent to this queue. An MLOps agent reads from the queue and directs the records to DataRobot for model monitoring.

  1. Navigate to Services > SQS.
  2. Click Create New Queue.
  3. Provide the queue name: sqs_mlops_data_queue.
  4. Choose the Standard Queue option.
  5. Configure Queue Attributes if desired, or choose the Quick-Create Queue with defaults.

Notice some of the configurable options, such as Default visibility timeout. Once an element is read from the queue, it is invisible to other queue consumers for this amount of time, after which it becomes visible again if the consumer did not also report back that the item was processed successfully and that it can be deleted from the queue.

Another configurable option Message retention defines how long an unprocessed item is allowed to stay in the queue. If it is not processed by this time, the item is removed. If agents are down for some reason, it is a good idea to set this value to a week or longer so that queue items are not lost before being consumed by an MLOps agent.

Make note of the Amazon Resource Name (ARN) upon creation.

Create agent SQS queue

This queue is used to kick off Lambda services which will run MLOps agents. These agents dequeue elements from the Data Queue and report them to the DataRobot platform for monitoring.

  1. Navigate to Services > SQS.
  2. Click Create New Queue.
  3. Provide the queue name: sqs_mlops_agent_queue. (Notice that it is agent and not data this time.)
  4. Choose the Standard Queue option.
  5. Set the default queue attribute Default Visibility Timeout to 16 minutes.
  6. Configure Queue Attributes if desired, or choose the Quick-Create Queue with defaults.

A read element will not become readable for the 16 minutes specified for the Default Visibility Timeout. In theory, a successful Lambda service (AWS limit of 15 minutes) will have been triggered by the element, and a successful return from the function will result in the SQS element being removed permanently from the queue. If it fails for some reason, read element visibility returns to the queue. This timeout should allow for each element to be sent to DataRobot once, and will prevent any record from being read and sent in duplicate.

Create DynamoDB tables to track Lambda agents and errors

A table is used to track concurrent Lambda functions running MLOps agents.

  1. Navigate to the DynamoDB (managed NoSQL AWS key-value/document store database) service on AWS.
  2. Select the Create table option.
  3. Name a new table lambda_mlops_agents and set a primary key of aws_request_id.
  4. Create the table with default settings.
  5. Navigate to the Overview tab and make note of the ARN for the table.

Another table is used to track agent errors.

  1. Navigate to the DynamoDB (managed NoSQL AWS key-value/document store database) service on AWS.
  2. Select the Create table option.
  3. Name a new table lambda_mlops_agents_error and set a primary key of aws_request_id.
  4. Create the table with default settings.
  5. Navigate to the Overview tab and make note of the ARN for the table.

You should now have two tables. One named lambda_mlops_agents and another named lambda_mlops_agents_error and you should have made a note of the ARN for both of them.

Configure the IAM role for a Lambda function

The following sections provide steps for configuring the IAM role for a Lambda function.

Add an inline policy for the database

  1. Navigate to Identity and Access Management (IAM).
  2. Under Roles, select Create role.
  3. Select AWS service and Lambda as a use case and then navigate to Next: Permissions.
  4. Search for and add the AWSLambdaBasicExecutionRole policy.
  5. Provide the Role name lambda_mlops_agent_role, and then click on Create role.
  6. In the Roles page, filter on the newly created role; choose it.
  7. Click on the Permissions tab, and then select Add inline policy.
  8. Select choose a service, then filter on “DynamoDB” and select it from the returned options.
  9. Under actions, choose the following privileges:

  10. Under Resources, choose Specific and click the Add ARN link under the table option.

  11. Specify the ARN of the DynamoDB lambda_mlops_agents and lambda_mlops_agents_error tables created previously.
  12. Choose Review policy.
  13. Provide the name lambda_mlops_agent_role_dynamodb_policy.
  14. Complete the task by clicking Create policy.

Add an inline policy for the queues

  1. Navigate to Identity and Access Management (IAM).
  2. Under Roles, select Create role.
  3. Select AWS service and Lambda as a use case and then navigate to Next: Permissions.
  4. Search for and add the AWSLambdaBasicExecutionRole policy.
  5. Provide the Role name lambda_mlops_agent_role, and then click on Create role.
  6. In the Roles page, filter on the newly created role; choose it.
  7. Click on the Permissions tab, and then select Add inline policy.
  8. Select choose a service, then filter on the “SQS” service.
  9. Select Read and Write checkboxes and Add ARN.
  10. Add the ARN of each SQS queue.
  11. Review the policy and name it lambda_mlops_agent_role_sqs_policy.

Add an inline policy for S3

  1. Navigate to Identity and Access Management (IAM).
  2. Under Roles, select Create role.
  3. Select AWS service and Lambda as a use case and then navigate to Next: Permissions.
  4. Search for and add the AWSLambdaBasicExecutionRole policy.
  5. Provide the Role name lambda_mlops_agent_role, and then click on Create role.
  6. In the Roles page, filter on the newly created role; choose it.
  7. Click on the Permissions tab, and then select Add inline policy.
  8. Select choose a service, then filter on the “S3” service.
  9. Choose the Read > GetObject privilege, and under Resources choose the specific bucket and all objects in it.
  10. Review the policy and save out as lambda_mlops_agent_role_s3_policy.

Create Python Lambda

  1. Navigate to the Lambda service Create function and, from scratch, create a new Python 3.7 Lambda service named mlops_agent_processor.
  2. Under Permissions, choose Use an existing role and select lambda_mlops_agent_role.

Create the Lambda environment variables

To configure the Lambda service, you need to set configuration and environment variables. The Lambda service routinely generates output, although you should set a timeout—30 seconds is likely adequate. The Lambda service doesn't do a large amount of local processing, so 512MB should be sufficient to run the agent efficiently and at a low Lambda service cost (the service is billed by GB-seconds). You need to set environment variables to indicate the location where the agent and its configuration are stored, the queues the agent interacts with, and the target concurrency.

Populate the Lambda function code

Use the following code in the lambda_function.py window.

from urllib.parse import unquote_plus  
import boto3  
import os  
import time  
from boto3.dynamodb.conditions import Key, Attr  
import subprocess  
import datetime  

def get_approx_data_queue_size(sqs_resource, queue_name):  
    data_queue = sqs_resource.get_queue_by_name(QueueName=queue_name)  
    return int(data_queue.attributes.get('ApproximateNumberOfMessages'))  

def lambda_handler(event, context):  

    request_id = context.aws_request_id  

    # the lambda environment is coming with openjdk version "1.8.0_201"  
    #os.system("java -version")  

    try:  
        # get and parse environment values  
        ENV_AGENT_BUCKET = os.environ['dr_mlops_agent_bucket']  
        ENV_AGENT_ZIP = os.environ['dr_mlops_agent_zip']  
        ENV_AGENT_CONFIG = os.environ['dr_mlops_agent_config']  
        ENV_TARGET_CONCURRENCY = int(os.environ['dr_mlops_agent_target_concurrency'])  
        ENV_DATA_QUEUE = os.environ['data_queue']  
        ENV_AGENT_QUEUE = os.environ['agent_queue']  

        agent_config = os.path.basename(ENV_AGENT_CONFIG)  

        # datarobot_mlops_package-6.3.3-488.tar.gz  
        agent_zip = os.path.basename(ENV_AGENT_ZIP)  

        # datarobot_mlops_package-6.3.3-488  
        temp_agent_dir = agent_zip.split(".tar")[0]  

        # datarobot_mlops_package-6.3.3  
        temp_agent_dir = temp_agent_dir.split("-")  
        agent_dir = temp_agent_dir[0] + '-' + temp_agent_dir[1]  

    except:   
        raise Exception("Problem retrieving and parsing environment variables!")  

    # lambda max runtime allowed (15 minute AWS maximum duration, recommended value to use here is 14 for MAXIMUM_MINUTES)  
    MAXIMUM_MINUTES = 14  
    start_time_epoch = int(time.time())  
    time_epoch_15m_ago = start_time_epoch - int(60 * 14.7)  
    time_epoch_60m_ago = start_time_epoch - 60 * 60  
    max_time_epoch = start_time_epoch + 60 * int(MAXIMUM_MINUTES)  

    # check number of items in data queue to process  
    sqs = boto3.resource('sqs')  
    approx_data_queue_size = get_approx_data_queue_size(sqs, ENV_DATA_QUEUE)  

    # exit immediately if data queue has nothing to process  
    if approx_data_queue_size == 0:  
        print('nothing to process, queue is empty.')  
        return None  

    # connect to database  
    dynamodb = boto3.resource('dynamodb')  

    # count running agents in dynamo in the last 15 minutes  
    table = dynamodb.Table('lambda_mlops_agents')  
    response = table.scan(  
        FilterExpression=Attr('start_time_epoch').gte(time_epoch_15m_ago)  
    )  
    agent_count = int(response['Count'])  
    print ('agent count started and running in the last 15 minutes is: ' + str(agent_count))  

    # count error agent records in dynamo in the last hour  
    error_table = dynamodb.Table('lambda_mlops_agents_error')  
    response = error_table.scan(  
        FilterExpression=Attr('start_time_epoch').gte(time_epoch_60m_ago)  
    )  
    error_count = int(response['Count'])  
    print ('agent errors count in the past 60 minutes: ' + str(error_count))  

    # exit immediately if there has been an error in the last 60 minutes  
    if error_count > 0:  
        print('exiting - lambda agents have errored within the last hour.')  
        return None  

    # create agent queue in case recruitment is needed  
    agent_queue = sqs.get_queue_by_name(QueueName=ENV_AGENT_QUEUE)  

    # exit immediately if target concurrent lambda count has already been reached  
    if agent_count >= ENV_TARGET_CONCURRENCY:  
        print('exiting without creating a new agent, already hit target concurrency of: ' + str(ENV_TARGET_CONCURRENCY))  
        return None  
    else:  
        # how many items does it take to be in the queue backlog for each additional agent to recruit?  
        SQS_QUEUE_ITEMS_PER_LAMBDA = 500  

        # add agent record to table for this lambda instance  
        table.put_item(Item= {'aws_request_id': request_id, 'start_time_epoch':  start_time_epoch})  

        # total lambdas, minimum of queue size / items per lambda or target concurrency, -1 for this lambda, - current running agent count  
        lambdas_to_recruit = min(-(-approx_data_queue_size // SQS_QUEUE_ITEMS_PER_LAMBDA), ENV_TARGET_CONCURRENCY) - 1 - agent_count  
        if lambdas_to_recruit < 0:  
            lambdas_to_recruit = 0  

        for x in range(lambdas_to_recruit):  
            print('adding new agent: ' + str(x))  
            agent_queue.send_message(MessageBody='{ request_id: "' + request_id + '", source: "lambda_new_' + str(x) + '" }')  

    # install agent  
    try:  
        # switch to a local workspace  
        os.chdir("/tmp")  

        # get agent zip if it is not already here, and install it  
        if os.path.isfile("/tmp/" + agent_zip) == False:  
            print('agent does not exist.  installing...')  
            print("bucket: " + ENV_AGENT_BUCKET + " agent zip: " + ENV_AGENT_ZIP)  
            # get agent zip  
            s3 = boto3.resource('s3')  
            s3.Bucket(ENV_AGENT_BUCKET).download_file(ENV_AGENT_ZIP, agent_zip)  

            # unzip contents  
            os.system('tar -xvf ' + agent_zip + ' 2> /dev/null')  

            # replace config  
            os.chdir('/tmp/' + agent_dir + '/conf')  
            s3.Bucket(ENV_AGENT_BUCKET).download_file(ENV_AGENT_CONFIG, agent_config)  

        else:  
            print('agent already exists, hot agent!')  

    except:   
        raise Exception("Problem installing the agent!")  

    # start the agent  
    os.chdir('/tmp/' + agent_dir)  
    os.system('bin/start-agent.sh')  

    time.sleep(5)  
    output = subprocess.check_output("head logs/mlops.agent.log", shell=True)  
    print('head mlops.agent.log --- \n' + str(output))  

    output = subprocess.check_output("head logs/mlops.agent.out", shell=True)  
    print('head mlops.agent.out --- \n' + str(output))  

    output = subprocess.check_output("tail -3 logs/mlops.agent.log | grep 'Fail\|Error\|ERROR' | wc -l", shell=True)  
    print('tail -3 logs/mlops.agent.log log count --- \n' + str(int(output)) + ' errors.')  

    # write to error_table if agent is failing  
    if int(output) > 0:  
        error_table.put_item(Item= {'aws_request_id': request_id, 'start_time_epoch':  start_time_epoch})  
        print('exiting - lambda agent errored.')  

        # stop agent and clean up  
        os.system('bin/stop-agent.sh ')  
        os.system('rm -rf bin/PID.agent')  

        # remove dynamo record for this lambda  
        table.delete_item(Key= {'aws_request_id': request_id})  

        return None  

    # time to let the agent do its thing...  
    current_epoch = int(time.time())  

    # while there is still time left to run the Lambda and there are items on the queue  
    while current_epoch < max_time_epoch and approx_data_queue_size > 0:  
        time.sleep(10)  
        current_epoch = int(time.time())  
        approx_data_queue_size = get_approx_data_queue_size(sqs, ENV_DATA_QUEUE)  
        print(datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + \  
            ' - approx_data_queue_size: ' + str(approx_data_queue_size))  
        #output = subprocess.check_output("tail logs/mlops.agent.log", shell=True)  
        #print('tail mlops.agent.log --- \n' + str(output))  

    # exited processing loop, time to stop agent and clean up  
    os.system('bin/stop-agent.sh ')  
    os.system('rm -rf bin/PID.agent')  

    # remove dynamo record for this lambda  
    table.delete_item(Key= {'aws_request_id': request_id})  

    # if we ran out of time and there are more items in the backlog, recruit a replacement for this lambda  
    if current_epoch > max_time_epoch:  
        print('ran out of time...')  
        # if there are still elements to process  
        if approx_data_queue_size > 0:  
            print('adding replacement agent...')  
            agent_queue.send_message(MessageBody='{ request_id: "' + request_id + '", source: "lambda_replacement" }')

Create the Lambda function SQS trigger

  1. Click +Add trigger to create a trigger to initiate the Lambda function.
  2. Search for "SQS" under Trigger configuration.
  3. Choose sqs_mlops_agent_queue.
  4. Specify a batch size of 1.
  5. Enable and add the trigger.

Configure the agent account

Set an API token for a DataRobot account in mlops.agent.conf.yaml. It is best to use the token for a service account rather than a particular user to avoid the risk of the account being deactivated in the future. It is also best for the admin to exempt this account from any API rate limiting, which can be done in the user's profile settings.

Test the queue processor

The following sections explain how to set up an MLOps agent user and an external deployment in order to test the queue processor.

Create an IAM MLOps agent user

This user will be used for generating records to fill the data queue.

  1. Navigate to the IAM service and choose Add user.
  2. Name the user community_agent_user_iam.
  3. Select the checkbox for Programmatic access.
  4. Click on the Permissions page.
  5. Choose Attach existing policies and filter on “SQS”.
  6. Choose the AmazonSQSFullAccess checkbox and click through next steps to create the user.

Upon successful creation, the Access key ID and Secret access key are displayed. Save both of these values.

Create an external deployment

Complete this step and the following in a client environment within the unzipped MLOps agent directory, such as datarobot-mlops-agent-6.1.3.

  1. Install the Python wheel file from within the unzipped MLOps agent directory.

    pip install lib/datarobot_mlops-*-py2.py3-none-any.whl --user
    
  2. The conf/mlops.agent.conf.yaml must be updated as well. Specify the mlopsURL value to be that of the app server, for instance https://app.datarobot.com.

  3. Update the apiToken.

  4. Navigate to channelConfigs and comment out the FS_SPOOL and associated spoolDirectoryPath line.

  5. Specify SQS channel values to take its place, using the URL of the SQS data queue.

    yaml - type: "SQS_SPOOL" details: {name: "sqsSpool", queueUrl: " <https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue> "}

  6. The model that will be used is the examples/python/BinaryClassificationExample. Change into this directory and create the deployment. This will produce a MLOPS_DEPLOYMENT_ID and MLOPS_MODEL_ID; make note of these values.

Send records to the SQS queue via a script

  1. From the same example directory, edit the run_example.sh script.

  2. Add the following the lines to the script to specify the target SQS queue for tracking record data and the AWS credentials for authentication:

      # values created from create_deployment.sh
      export MLOPS_DEPLOYMENT_ID=5e123456789012
      export MLOPS_MODEL_ID=5e123456789032
    
      # where the tracking records will be sent
      export MLOPS_OUTPUT_TYPE=SQS
      export MLOPS_SQS_QUEUE_URL=' <https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue> '
    
      # credentials
      export AWS_DEFAULT_REGION=us-east-1
      export AWS_ACCESS_KEY_ID='AKIAABCDEABCDEABCDEPY'
      export AWS_SECRET_ACCESS_KEY='GpzABCDEABCDEABCDEl6dcm'
    
  3. Comment out the existing export MLOPS_OUTPUT_TYPE=OUTPUT_DIR line.

  4. Run the example script. It should successfully populate records into the queue, which the agent process will then consume and report back to DataRobot.

Create a schedule

The last step is to run the Lambda function on a schedule using the AWS CloudWatch service. Check if any records are in the queue, and subsequently begin processing them and scaling up as needed.

  1. In AWS, navigate to the CloudWatch service.
  2. Then click on Events > Rules, and click Create rule.
  3. Click Schedule, and set the rule to run every 15 minutes.
  4. Next, click Add target.
  5. From the dropdown, choose the SQS queue option.
  6. Select sqs_mlops_agent_queue, and then click Configure details.

  7. Provide the name of sqs_mlops_agent_queue_trigger_15m.

  8. Enable the rule, and select Create rule.

This procedure enables the rule which will execute every 15 minutes. Use a cron expression instead to run on a round number (at the top of the minute), if desired.

Wrap-up

The serverless architecture spins up a single MLOps agent on the schedule you enabled. If the defined logic determines that the queue has many items, it recruits friends to process the queue. With this architecture in place, the actual costs for processing the queue are minimal since compute is only paid for when necessary. You can configure the flexible scaling options to scale up and handle a large amount of demand, should the need arise.


Updated July 7, 2022
Back to top