Skip to content

アプリケーション内で をクリックすると、お使いのDataRobotバージョンに関する全プラットフォームドキュメントにアクセスできます。

サーバーレスMLOpsエージェントでの監視

DataRobotは、外部システムにデプロイされたモデルのパフォーマンスやドリフト統計情報を監視できます。 これらの外部デプロイモデルには、次のようなものがあります。

  • DataRobotが生成したモデルのDockerコンテナ
  • JavaまたはPythonスコアリングコードとしてエクスポートされたDataRobotモデル
  • カスタムコードモデルや、別のツールで作成されたモデル

このセクションでは、デプロイされたモデルを追跡するために、AWSでDataRobot MLOpsエージェントをスケーリングし、サーバーレスリソースを使用して小規模および大規模なデータキューを処理する方法を説明します。

MLOpsエージェントのアーキテクチャの概要

簡単に言うと、外部にデプロイされた機械学習モデルはDataRobot MLOpsライブラリを活用してレポートレコードを生成し、それらをDataRobotに送信して処理およびレポートします。 DataRobot MLOpsエージェントはこれらのレコードを使用し、DataRobotインスタンスにパイプします。 フラットファイル、AWS SQS、RabbitMQなど、さまざまなスプールソースから読み取るようにエージェントを設定できます(完全なリストについては、予測取込みオプションを参照)。 このトピックでは、サーバーレスMLOpsエージェントのスケーリングによってAWS SQSキューを使用する方法について、キューデータの使用とDataRobotへのレポートのみに焦点を当てて説明します。

標準のソリューションアーキテクチャ

この図の外部モデルは、DataRobotクラスターの外部にデプロイされた、上記のいずれかのタイプのモデルを表しています。 1つのアプローチとして、安定したコンピューティング環境でMLOpsエージェントを実行する方法があります。 これには、サーバーリソースの立ち上げ、リソースへのエージェントのインストール、および使用する新しいデータを継続的にポーリングして使用し、DataRobotに報告することが含まれます。 これは適切なソリューションですが、コストとスケーリングという2つの欠点があります。

サーバーは常に稼働しているため、最小のソリューションでもコストがかかる可能性があります。 多くの要素を持つキューをソリューションが使用する必要がある場合に、スケーリングが問題になります。 たとえば、SQSキューへの書き込みが多いモデルや非常にビジーなモデルなどです。SQSキューに100万の要素がバックログされている場合はどうなるでしょうか? キューが単一エージェントによって完全に処理されるまでの時間は? 複数のエージェントを実行して、同時にデータを使用して送り返すことができます。 EC2 Auto Scalingはこの問題を解決しません。より多くのマシンをスケーリングするためのトリガーメカニズムは、バックログ内のアイテムの実際の量(外部キューから処理する必要があるもの)ではなく、EC2サーバー自体のビジー状態に関連しています。

サーバーレスソリューションアーキテクチャ

サーバーレスアーキテクチャでは、AWS Lambdaを活用してオンデマンドでMLOpsエージェントを作成できます。また、複数のLambda関数を介して追加のエージェントをスケーリングし、バックログに基づいて入力されたキューを使用できます。

MLOpsエージェントコードは保存され、S3から取得され、Lambda関数に取り込まれてインスタンス化されます。 そこで、外部モデルによってデータSQSキューに書き込まれたレコードをポーリングして使用します。 この関数には、追加のMLOpsエージェントを実行するタイミング(およびその数)を指定するロジックが含まれています。 エージェントSQSキューにメッセージを挿入すると、エージェントの作成がトリガーされて、エージェントが「リクルート」されます。

Lambda関数の同時実行は、DynamoDB noSQLデータベースを介して実行中の関数を追跡することで管理されます。 Lambdaサービスには同時実行予約の設定がありますが、スロットが開いたらすぐにLambda関数を実行できるような、多数の空きスロットとして動作するわけではありません。

代わりに、Lambda関数の呼び出しを予約の上限を超えてスリープさせ、同時実行のスロットが空くまでスリープ時間を長くしていく仕組みになっています。 これは、何も処理されていないアイドル待機時間が長くなる可能性があるため、データ処理時に望ましい結果になることはめったにありません。

CloudWatchスケジュールは、15分ごとにエージェントSQSキューにメッセージを挿入するように設定されています。 この操作では、最初のLambda関数を実行して、データキュー自体が空でない状態かどうかを確認します。 実行後、削除する必要があります。

Lambda関数の最大実行時間が15分になりました。 その後もバックログが残っている場合、MLOpsエージェントは正常に終了し、別のメッセージをエージェントSQSキューに渡して、別の新しいLambda関数インスタンスをトリガーし、代わりに実行します。

依存アイテムのS3への追加

Lambda関数は、S3からエージェントインストーラーと設定ファイルを取得し、これらのアイテムのためにバケットを作成するか既存のバケットを活用します。 MLOpsエージェントのtarballとその設定は、S3でホストされます。これらは両方とも、環境設定から関数に提供されます。 (このトピックでは、これらのオブジェクトをそれぞれエージェントのアーカイブおよび設定と呼びます。)

s3://my_bucket/community/agent/datarobot-mlops-agent-6.1.3-272.tar.gz
s3://my_bucket/community/agent/mlops.agent.conf.yaml 

データのSQS(Simple Queue Service)キューの作成

このキューは、1対多の外部デプロイモデルによって書き込まれます。 スコアリングされたデータレコードとメタデータは、このキューに送信されます。 MLOpsエージェントはキューから読み取り、モデルの監視用にレコードをDataRobotに送信します。

  1. サービス > SQSに移動します。
  2. 新しいキューの作成をクリックします。
  3. キュー名sqs_mlops_data_queueを指定します。
  4. 標準キューオプションを選択します。
  5. 必要に応じてキュー属性を設定するか、デフォルト設定でキューのクイック作成を選択します。

デフォルトの可視性タイムアウトなど、設定可能なオプションのいくつかに注意してください。 キューから読み込まれた要素は、この期間、他のキューコンシューマーには表示されません。その後、アイテムが正常に処理され、キューから削除できることをコンシューマーが報告しなかった場合、再び表示されます。

もう1つの設定可能なオプションメッセージの保持は、未処理のアイテムがキューに留まることができる期間を定義します。 この時点までに処理されない場合、アイテムは削除されます。 何らかの理由でエージェントがダウンしている場合は、MLOpsエージェントによって使用される前にキューアイテムが失われないように、この値を1週間以上に設定することをお勧めします。

作成時にAmazonリソースネーム(ARN)をメモします。

エージェントSQSキューの作成

このキューを使用して、MLOpsエージェントを実行するLambdaサービスを開始します。 これらのエージェントは、要素をデータキューから取り出し、監視用にDataRobotプラットフォームに報告します。

  1. サービス > SQSに移動します。
  2. 新しいキューの作成をクリックします。
  3. キュー名 sqs_mlops_agent_queueを指定します。 (今回はデータではなくエージェントであることに注意してください。)
  4. 標準キューオプションを選択します。
  5. デフォルトのキュー属性デフォルトの可視性タイムアウトを16分に設定します。
  6. 必要に応じてキュー属性を設定するか、デフォルト設定でキューのクイック作成を選択します。

読み込まれた要素は、デフォルトの可視性タイムアウトで指定された16分間は読み取り可能になりません。 理論的には、成功したLambdaサービス(AWSでは15分に制限)は要素によってトリガーされ、関数から正常に戻ると、SQS要素はキューから完全に削除されます。 何らかの理由で失敗した場合、読み込まれた要素の可視性はキューに戻ります。 このタイムアウトにより、各要素が1回ずつDataRobotに送信されるため、レコードが重複して読み込まれたり送信されたりするのを防ぐことができます。

Lambdaエージェントとエラーを追跡するDynamoDBテーブルの作成

テーブルを使用して、同時にMLOpsエージェントを実行しているLambda関数を追跡します。

  1. AWSのDynamoDB(マネージドNoSQL AWSキー値/ドキュメントストアデータベース)サービスに移動します。
  2. テーブルの作成オプションを選択します。
  3. 新しいテーブルにlambda_mlops_agentsという名前を付け、aws_request_idをプライマリーキーに設定します。
  4. デフォルト設定でテーブルを作成します。
  5. 概要タブに移動し、テーブルのARNをメモします。

別のテーブルを使用して、エージェントエラーを追跡します。

  1. AWSのDynamoDB(マネージドNoSQL AWSキー値/ドキュメントストアデータベース)サービスに移動します。
  2. テーブルの作成オプションを選択します。
  3. 新しいテーブルにlambda_mlops_agents_errorという名前を付け、aws_request_idをプライマリーキーに設定します。
  4. デフォルト設定でテーブルを作成します。
  5. 概要タブに移動し、テーブルのARNをメモします。

2つのテーブルが作成されました。 1つはlambda_mlops_agents、もう1つはlambda_mlops_agents_errorという名前で、両方のARNをメモしておく必要があります。

Lambda関数のIAMロールの設定

以下のセクションでは、Lambda関数のIAMロールを設定する手順について説明します。

データベースのインラインポリシーの追加

  1. Identity and Access Management(IAM)に移動します。
  2. ロールで、ロールの作成を選択します。
  3. ユースケースとしてAWSサービスLambdaを選択し、次のステップ:アクセス権限に移動します。
  4. AWSLambdaBasicExecutionRoleポリシーを検索して追加します。
  5. ロール名lambda_mlops_agent_roleを指定し、ロールの作成をクリックします。
  6. ロールページで、新しく作成したロールでフィルターし、選択します。
  7. アクセス権限タブをクリックし、インラインポリシーの追加を選択します。
  8. サービスを選ぶを選択し、次に「DynamoDB」でフィルターして、返されたオプションから選択します。
  9. [アクション]で、次の権限を選択します。

  10. [リソース]で指定を選択し、[テーブル]オプションにあるARNの追加リンクをクリックします。

  11. 前に作成したDynamoDBテーブルlambda_mlops_agentslambda_mlops_agents_errorのARNを指定します。
  12. ポリシーの確認を選択します。
  13. 名前lambda_mlops_agent_role_dynamodb_policyを入力します。
  14. ポリシーの作成をクリックしてタスクを完了します。

キューのインラインポリシーの追加

  1. Identity and Access Management(IAM)に移動します。
  2. ロールで、ロールの作成を選択します。
  3. ユースケースとしてAWSサービスLambdaを選択し、次のステップ:アクセス権限に移動します。
  4. AWSLambdaBasicExecutionRoleポリシーを検索して追加します。
  5. ロール名lambda_mlops_agent_roleを指定し、ロールの作成をクリックします。
  6. ロールページで、新しく作成したロールでフィルターし、選択します。
  7. アクセス権限タブをクリックし、インラインポリシーの追加を選択します。
  8. サービスを選ぶを選択し、次に「SQS」サービスでフィルターします。
  9. 読み込み書き込みのチェックボックスおよびARNの追加を選択します。
  10. 各SQSキューのARNを追加します。
  11. ポリシーを確認し、lambda_mlops_agent_role_sqs_policyという名前を付けます。

S3のインラインポリシーの追加

  1. Identity and Access Management(IAM)に移動します。
  2. ロールで、ロールの作成を選択します。
  3. ユースケースとしてAWSサービスLambdaを選択し、次のステップ:アクセス権限に移動します。
  4. AWSLambdaBasicExecutionRoleポリシーを検索して追加します。
  5. ロール名lambda_mlops_agent_roleを指定し、ロールの作成をクリックします。
  6. ロールページで、新しく作成したロールでフィルターし、選択します。
  7. アクセス権限タブをクリックし、インラインポリシーの追加を選択します。
  8. サービスを選ぶを選択し、次に「S3」サービスでフィルターします。
  9. 読み取り > GetObject権限を選択し、[リソース]で特定のバケットとその中のすべてのオブジェクトを選択します。
  10. ポリシーを確認し、lambda_mlops_agent_role_s3_policyとして保存します。

Python Lambdaの作成

  1. Lambdaサービスの[関数の作成]に移動し、mlops_agent_processorという名前の新しいPython 3.7 Lambdaサービスを最初から作成します。
  2. [アクセス権限]で、既存のロールを使用するを選択し、lambda_mlops_agent_roleを選択します。

Lambda環境変数の作成

Lambdaサービスを設定するには、設定変数と環境変数を設定する必要があります。 Lambdaサービスは定期的に出力を生成しますが、タイムアウトを設定する必要があります(30秒ぐらいで十分です)。 Lambdaサービスでは大量のローカル処理を行わないため、エージェントを効率的に実行し、Lambdaサービスのコストを低く抑えるには512MBあれば十分です(サービスはGB秒単位で課金されます)。 環境変数を設定して、エージェントとその設定が保存されている場所、エージェントがやり取りするキュー、およびターゲットの同時実行数を示します。

Lambda関数コードの入力

lambda_function.pyウィンドウで次のコードを使用します。

from urllib.parse import unquote_plus  
import boto3  
import os  
import time  
from boto3.dynamodb.conditions import Key, Attr  
import subprocess  
import datetime  

def get_approx_data_queue_size(sqs_resource, queue_name):  
    data_queue = sqs_resource.get_queue_by_name(QueueName=queue_name)  
    return int(data_queue.attributes.get('ApproximateNumberOfMessages'))  

def lambda_handler(event, context):  

    request_id = context.aws_request_id  

    # the lambda environment is coming with openjdk version "1.8.0_201"  
    #os.system("java -version")  

    try:  
        # get and parse environment values  
        ENV_AGENT_BUCKET = os.environ['dr_mlops_agent_bucket']  
        ENV_AGENT_ZIP = os.environ['dr_mlops_agent_zip']  
        ENV_AGENT_CONFIG = os.environ['dr_mlops_agent_config']  
        ENV_TARGET_CONCURRENCY = int(os.environ['dr_mlops_agent_target_concurrency'])  
        ENV_DATA_QUEUE = os.environ['data_queue']  
        ENV_AGENT_QUEUE = os.environ['agent_queue']  

        agent_config = os.path.basename(ENV_AGENT_CONFIG)  

        # datarobot_mlops_package-6.3.3-488.tar.gz  
        agent_zip = os.path.basename(ENV_AGENT_ZIP)  

        # datarobot_mlops_package-6.3.3-488  
        temp_agent_dir = agent_zip.split(".tar")[0]  

        # datarobot_mlops_package-6.3.3  
        temp_agent_dir = temp_agent_dir.split("-")  
        agent_dir = temp_agent_dir[0] + '-' + temp_agent_dir[1]  

    except:   
        raise Exception("Problem retrieving and parsing environment variables!")  

    # lambda max runtime allowed (15 minute AWS maximum duration, recommended value to use here is 14 for MAXIMUM_MINUTES)  
    MAXIMUM_MINUTES = 14  
    start_time_epoch = int(time.time())  
    time_epoch_15m_ago = start_time_epoch - int(60 * 14.7)  
    time_epoch_60m_ago = start_time_epoch - 60 * 60  
    max_time_epoch = start_time_epoch + 60 * int(MAXIMUM_MINUTES)  

    # check number of items in data queue to process  
    sqs = boto3.resource('sqs')  
    approx_data_queue_size = get_approx_data_queue_size(sqs, ENV_DATA_QUEUE)  

    # exit immediately if data queue has nothing to process  
    if approx_data_queue_size == 0:  
        print('nothing to process, queue is empty.')  
        return None  

    # connect to database  
    dynamodb = boto3.resource('dynamodb')  

    # count running agents in dynamo in the last 15 minutes  
    table = dynamodb.Table('lambda_mlops_agents')  
    response = table.scan(  
        FilterExpression=Attr('start_time_epoch').gte(time_epoch_15m_ago)  
    )  
    agent_count = int(response['Count'])  
    print ('agent count started and running in the last 15 minutes is: ' + str(agent_count))  

    # count error agent records in dynamo in the last hour  
    error_table = dynamodb.Table('lambda_mlops_agents_error')  
    response = error_table.scan(  
        FilterExpression=Attr('start_time_epoch').gte(time_epoch_60m_ago)  
    )  
    error_count = int(response['Count'])  
    print ('agent errors count in the past 60 minutes: ' + str(error_count))  

    # exit immediately if there has been an error in the last 60 minutes  
    if error_count > 0:  
        print('exiting - lambda agents have errored within the last hour.')  
        return None  

    # create agent queue in case recruitment is needed  
    agent_queue = sqs.get_queue_by_name(QueueName=ENV_AGENT_QUEUE)  

    # exit immediately if target concurrent lambda count has already been reached  
    if agent_count >= ENV_TARGET_CONCURRENCY:  
        print('exiting without creating a new agent, already hit target concurrency of: ' + str(ENV_TARGET_CONCURRENCY))  
        return None  
    else:  
        # how many items does it take to be in the queue backlog for each additional agent to recruit?  
        SQS_QUEUE_ITEMS_PER_LAMBDA = 500  

        # add agent record to table for this lambda instance  
        table.put_item(Item= {'aws_request_id': request_id, 'start_time_epoch':  start_time_epoch})  

        # total lambdas, minimum of queue size / items per lambda or target concurrency, -1 for this lambda, - current running agent count  
        lambdas_to_recruit = min(-(-approx_data_queue_size // SQS_QUEUE_ITEMS_PER_LAMBDA), ENV_TARGET_CONCURRENCY) - 1 - agent_count  
        if lambdas_to_recruit < 0:  
            lambdas_to_recruit = 0  

        for x in range(lambdas_to_recruit):  
            print('adding new agent: ' + str(x))  
            agent_queue.send_message(MessageBody='{ request_id: "' + request_id + '", source: "lambda_new_' + str(x) + '" }')  

    # install agent  
    try:  
        # switch to a local workspace  
        os.chdir("/tmp")  

        # get agent zip if it is not already here, and install it  
        if os.path.isfile("/tmp/" + agent_zip) == False:  
            print('agent does not exist.  installing...')  
            print("bucket: " + ENV_AGENT_BUCKET + " agent zip: " + ENV_AGENT_ZIP)  
            # get agent zip  
            s3 = boto3.resource('s3')  
            s3.Bucket(ENV_AGENT_BUCKET).download_file(ENV_AGENT_ZIP, agent_zip)  

            # unzip contents  
            os.system('tar -xvf ' + agent_zip + ' 2> /dev/null')  

            # replace config  
            os.chdir('/tmp/' + agent_dir + '/conf')  
            s3.Bucket(ENV_AGENT_BUCKET).download_file(ENV_AGENT_CONFIG, agent_config)  

        else:  
            print('agent already exists, hot agent!')  

    except:   
        raise Exception("Problem installing the agent!")  

    # start the agent  
    os.chdir('/tmp/' + agent_dir)  
    os.system('bin/start-agent.sh')  

    time.sleep(5)  
    output = subprocess.check_output("head logs/mlops.agent.log", shell=True)  
    print('head mlops.agent.log --- \n' + str(output))  

    output = subprocess.check_output("head logs/mlops.agent.out", shell=True)  
    print('head mlops.agent.out --- \n' + str(output))  

    output = subprocess.check_output("tail -3 logs/mlops.agent.log | grep 'Fail\|Error\|ERROR' | wc -l", shell=True)  
    print('tail -3 logs/mlops.agent.log log count --- \n' + str(int(output)) + ' errors.')  

    # write to error_table if agent is failing  
    if int(output) > 0:  
        error_table.put_item(Item= {'aws_request_id': request_id, 'start_time_epoch':  start_time_epoch})  
        print('exiting - lambda agent errored.')  

        # stop agent and clean up  
        os.system('bin/stop-agent.sh ')  
        os.system('rm -rf bin/PID.agent')  

        # remove dynamo record for this lambda  
        table.delete_item(Key= {'aws_request_id': request_id})  

        return None  

    # time to let the agent do its thing...  
    current_epoch = int(time.time())  

    # while there is still time left to run the Lambda and there are items on the queue  
    while current_epoch < max_time_epoch and approx_data_queue_size > 0:  
        time.sleep(10)  
        current_epoch = int(time.time())  
        approx_data_queue_size = get_approx_data_queue_size(sqs, ENV_DATA_QUEUE)  
        print(datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + \  
            ' - approx_data_queue_size: ' + str(approx_data_queue_size))  
        #output = subprocess.check_output("tail logs/mlops.agent.log", shell=True)  
        #print('tail mlops.agent.log --- \n' + str(output))  

    # exited processing loop, time to stop agent and clean up  
    os.system('bin/stop-agent.sh ')  
    os.system('rm -rf bin/PID.agent')  

    # remove dynamo record for this lambda  
    table.delete_item(Key= {'aws_request_id': request_id})  

    # if we ran out of time and there are more items in the backlog, recruit a replacement for this lambda  
    if current_epoch > max_time_epoch:  
        print('ran out of time...')  
        # if there are still elements to process  
        if approx_data_queue_size > 0:  
            print('adding replacement agent...')  
            agent_queue.send_message(MessageBody='{ request_id: "' + request_id + '", source: "lambda_replacement" }') 

Lambda関数のSQSトリガーの作成

  1. +トリガーを追加をクリックして、Lambda関数を開始するトリガーを作成します。
  2. トリガー設定で「SQS」を検索します。
  3. sqs_mlops_agent_queueを選択します。
  4. バッチサイズ1を指定します。
  5. トリガーを有効にして追加します。

エージェントアカウントの設定

mlops.agent.conf.yamlでDataRobotアカウントのAPIトークンを設定します。 将来アカウントが無効化されるリスクを回避するために、特定のユーザーではなくサービスアカウントに対してトークンを使用することをお勧めします。 また、管理者がこのアカウントをAPIレート制限から除外することも推奨します。これは、ユーザーのプロファイル設定で行えます。

キュープロセッサのテスト

次のセクションでは、キュープロセッサをテストするために、MLOpsエージェントユーザーと外部デプロイを設定する方法について説明します。

IAM MLOpsエージェントユーザーの作成

このユーザーを使用して、データキューを満たすレコードを生成します。

  1. IAMサービスに移動してユーザーを追加を選択します。
  2. ユーザーにcommunity_agent_user_iamという名前を付けます。
  3. プログラムによるアクセスのチェックボックスを選択します。
  4. [アクセス権限]ページをクリックします。
  5. 既存のポリシーを直接アタッチを選択し、「SQS」でフィルターします。
  6. AmazonSQSFullAccessチェックボックスを選択し、次の手順をクリックしてユーザーを作成します。

正常に作成されると、アクセスキーIDとシークレットアクセスキーが表示されます。 両方の値を保存します。

外部デプロイの作成

クライアント環境の解凍されたMLOpsエージェントディレクトリ(datarobot-mlops-agent-6.1.3など)内で、この手順と次の手順を実行します。

  1. 解凍されたMLOpsエージェントディレクトリ内からPython wheelファイルをインストールします。

    pip install lib/datarobot_mlops-*-py2.py3-none-any.whl --user 
    
  2. conf/mlops.agent.conf.yamlを同様に更新する必要があります。 mlopsURLの値には、アプリサーバーの値を指定します(例:https://app.datarobot.com)。

  3. apiTokenを更新します。

  4. channelConfigsに移動し、FS_SPOOLと関連するspoolDirectoryPath行をコメントアウトします。

  5. SQSデータキューのURLを使用して、代わりにSQSチャネル値を指定します。

    - type: "SQS_SPOOL"
    - details: {name: "sqsSpool", queueUrl: " <https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue> "} 
    
  6. 使用するモデルは、examples/python/BinaryClassificationExampleです。 このディレクトリに移動し、デプロイを作成します。 これにより、MLOPS_DEPLOYMENT_IDMLOPS_MODEL_IDが生成されます。これらの値をメモします。

スクリプトを介してSQSキューにレコードを送信する

  1. 同じサンプルディレクトリから、run_example.shスクリプトを編集します。

  2. 次の行をスクリプトに追加して、レコードデータを追跡するためのターゲットSQSキューと認証用のAWS認証情報を指定します。

      # values created from create_deployment.sh
      export MLOPS_DEPLOYMENT_ID=5e123456789012
      export MLOPS_MODEL_ID=5e123456789032
    
      # where the tracking records will be sent
      export MLOPS_OUTPUT_TYPE=SQS
      export MLOPS_SQS_QUEUE_URL=' <https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue> '
    
      # credentials
      export AWS_DEFAULT_REGION=us-east-1
      export AWS_ACCESS_KEY_ID='AKIAABCDEABCDEABCDEPY'
      export AWS_SECRET_ACCESS_KEY='GpzABCDEABCDEABCDEl6dcm' 
    
  3. 既存のexport MLOPS_OUTPUT_TYPE=OUTPUT_DIR行をコメントアウトします。

  4. サンプルスクリプトを実行します。 キューにレコードが正常に入力され、エージェントプロセスがそれを使用してDataRobotに報告します。

スケジュールの作成

最後の手順では、AWS CloudWatchサービスを使用して、スケジュールに従ってLambda関数を実行します。 キューにレコードがあるかどうかを確認し、その後、処理を開始し、必要に応じてスケールアップします。

  1. AWSで、CloudWatchサービスに移動します。
  2. 次に、イベント > ルールをクリックし、ルールの作成をクリックします。
  3. スケジュールをクリックし、ルールを15分ごとに実行するように設定します。
  4. 次にターゲットの追加をクリックします。
  5. ドロップダウンから、SQSキューオプションを選択します。
  6. sqs_mlops_agent_queueを選択し、詳細の設定をクリックします。

  7. sqs_mlops_agent_queue_trigger_15mの名前を入力します。

  8. ルールを有効にして、ルールの作成を選択します。

この手順により、15分ごとに実行されるルールが有効になります。 必要に応じて、代わりにcron式を使用して概数で(n分00秒に)実行します。

まとめ

サーバーレスアーキテクチャでは、有効にしたスケジュールで1つのMLOpsエージェントをスピンアップします。 定義されたロジックは、キューに多くのアイテムがあると判断した場合、キューを処理する「仲間」を募集します。 このアーキテクチャを導入すると、コンピューティングは必要な場合にのみ課金されるため、キューを処理するための実際のコストが最小限に抑えられます。 柔軟なスケーリングオプションを設定することで、必要に応じてスケールアップし、大量の需要に対応できます。


更新しました December 21, 2022
Back to top