MLOpsでのSageMakerモデルの監視¶
このトピックでは、AWSで開発およびデプロイされたAWS SageMakerモデルをリアルタイムAPIスコアリング用に監視する方法について概説します。 DataRobotでは、AWSモデルとDataRobot間の直接接続を必要としない、リモートエージェントアーキテクチャを介してモデルを監視できます。 このトピックでは、監視キューにデータを追加する方法について説明します。 キューからデータを使用する方法については、サーバーレスMLOpsエージェントでの監視のトピックを参照してください。
技術アーキテクチャ¶
この記事の手順に従って、上記のデプロイアーキテクチャを構築できます。 以下のリストで各コンポーネントの詳細を確認します。
- APIクライアントは、スコアリング用の元のデータ入力の1行のJSON要求を組み立てます。これは、APIGatewayに公開されたエンドポイントにポストされます。
- API Gatewayはパススルーとして機能し、関連するLambda関数にリクエストを送信して処理します。
- Lambdaのロジックは、元の入力データを処理し、SageMakerエンドポイントを介してスコアリングに必要な形式に解析します。 この例では、XGBoostモデルのヘッダーなしCSVにデータを解析します。 次に、SageMakerエンドポイントが呼び出されます。
- SageMakerエンドポイントは、リアルタイムモデルをホストしているデプロイ済みのEC2インスタンスにリクエストを渡すことで、リクエストを完了します。 コミュニティAIエンジニアリングGitHubリポジトリ(
xgb.deploy
)のAWSコードからのモデルデプロイラインは、このマシンの立ち上げと、トレーニング済みのAWS ECRでホストされたモデルの導入を処理します。 - 元のスコアはLambdaによって処理されます。この例では、しきい値を適用して二値分類ラベルを選択します。
- タイミング、入力データ、モデルの結果がSQSキューに書き込まれます。
- 処理された応答がAPI Gatewayに返されます。
- 処理された応答がクライアントに返されます。
カスタムSageMakerモデルの作成¶
この記事は、AWS GitHubリポジトリのSageMakerノートブックの例に基づいています。
このユースケースは、どの顧客がダイレクトマーケティングキャンペーンに肯定的に反応するかを予測することを目的としています。 コードはSageMaker SDKのv2バージョンで確認するために更新されており、DataRobotコミュニティAIエンジニアリングGitHubリポジトリにあります。
AWS SageMakerでノートブックを完成させると、起動しているml.m4.xlarge
インスタンスでホストされているxgboost-direct-marketing
という名前のSageMakerエンドポイントにモデルがデプロイされます。
備考
エンドポイントは、トレーニング中に提供されたのと同じ順序で、完全に準備され、前処理されたデータ(たとえば、ワンホットエンコーディングが適用済み)を想定しています。
SageMakerエンドポイントをテストするにはいくつかの方法があります。以下のスニペットは、検証セットからレコードをスコアリングできる短いPythonスクリプトです(ターゲット列が削除されました)。
import boto3
import os
import json
runtime = boto3.Session().client('sagemaker-runtime',use_ssl=True)
endpoint_name = 'xgboost-direct-marketing'
payload = '29,2,999,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,1,0'
response = runtime.invoke_endpoint(EndpointName=endpoint_name,
ContentType='text/csv',
Body=payload)
result = json.loads(response['Body'].read())
print(result)
DataRobotでの外部デプロイの作成¶
DataRobot内で、SageMakerモデルを監視するためにデプロイエントリーを作成する必要があります。 データと統計は、処理、視覚化、および分析のためにこのデプロイに報告されます。 外部デプロイを作成するには、次の手順を実行します。
- モデルレジストリ > モデルパッケージタブに移動します。
- 新規パッケージを追加をクリックします。
-
新しい外部モデルパッケージを選択します。
-
以下に示すように、必要な情報を入力します。
-
コミュニティGitHubから
bank-additonal-full.csv
ファイルをダウンロードします。 bank-additional-full.csv
をトレーニングデータセットとしてアップロードします。 これは一例ですが、この例のモデルは、完全にデプロイする前に100%のデータでトレーニングされていないことに注意してください。 実際の機械学習では、これは考慮すべき点です。- パッケージを作成をクリックしてモデルパッケージの作成を完了し、モデルレジストリに追加します。
-
レジストリでパッケージを見つけて、パッケージのアクションメニューをクリックします。
-
ターゲット監視の有効化と特徴量のドリフト追跡の有効化をオンにして、デプロイを作成を選択します。
必要に応じて、追加の予測環境メタデータを設定できます。 外部モデルが存在するメタデータの詳細(この場合はAWS)。
デプロイの作成が完了したら、ID値をいくつか取得する必要があります。 これらはSageMarketのモデルに関連付けられます。
-
デプロイの下で、予測 > モニタリングタブに移動して、監視コードを表示します。
MLOPS_DEPLOYMENT_ID
とMLOPS_MODEL_ID
の値をコピーします。
備考
MLOPS_DEPLOYMENT_ID
はモデル監視内のエントリーに関連付けられているのに対し、MLOPS_MODEL_ID
はその背後にある実際のスコアリングモデルに提供される識別子であることに注意してください。
MLOPS_DEPLOYMENT_ID
は静的なままにしてください。 ただし、ある時点でSageMakerモデルを置き換えることができます。 その場合、次の2つのアクションのいずれかを実行してください。
- 上記と同じ手順に従って、DataRobotでまったく新しい外部デプロイを作成します。
- 新しいモデルパッケージを登録し、このデプロイで現在ホストされているモデルをその新しいパッケージに置き換えます。
このシナリオでは、新しいMLOPS_MODEL_ID
が割り当てられ、Lambda環境変数の更新に使用します。 さらに、DataRobotの同じMLOPS_DEPLOYMENT_ID
エントリーは、同じエントリーの下に両モデルの統計を表示し、変更がいつ発生したかを記録します。
IAMロールの作成¶
Lambda関数はロールを使用して、SageMakerエンドポイントを介してデータをスコアリングします。 ロールを作成するには、次の手順を実行します。
- AWSコンソール内のIAMサービスに移動します。
- ロールの作成をクリックし、ユースケース用のLambdaを選択します。 次へ:ユーザー権限をクリックします。
-
ポリシーの作成を選択し、JSONタブを選択して、以下のスニペットを貼り付けます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "sagemaker:InvokeEndpoint", "Resource": "*" } ] }
-
ポリシーの確認を選択し、ポリシーに
lambda_sagemaker_execution_policy
という名前を付けます。 - ポリシーの作成をクリックします。
- 上記の手順で作成したロールに、ポリシーをアタッチできるようになりました。 更新ボタンを選択し、文字列
sagemaker
でフィルターします。 - リストからポリシーを選択します。
- 次へ:タグをクリックし、任意のタグを設定して、次へ:レビューを選択します。
- ロールに
lambda_sagemaker_execution_role
という名前を付けて、ロールの作成をクリックします。 - このロールには、LambdaがレポートデータをSQSキューに送信できるように、追加のリソースが必要です。
レポートデータを受信するためのキューを作成するには、サーバーレスMLOpsエージェントでの監視を参照してください。 そのトピックで作成されたキュー
sqs_mlops_data_queue
をここでも使用します。 - 追加のリソースを追加するには、IAMロール
lambda_sagemaker_execution_role
を表示します。 - インラインポリシーの追加を選択し、SQSサービスの検索を実行します。
- リスト、読み取り、書き込みのアクセスレベルを選択します。
- 必要に応じて、読み取りの下のReceiveMessageの選択を解除して、このロールがアイテムをキューから移動しないようにします。
-
リソースを展開して、特定のデータキューのみを使用するようにロールを制限し、キューのARNを入力します。
-
ポリシーの確認をクリックし、
lambda_agent_sqs_write_policy
という名前を付けます。 -
ポリシーを完了するには、ポリシーの作成を選択します。
備考
LambdaがログエントリーをCloudWatchに書き込むには、追加の権限が必要です。
-
ポリシーの添付を選択します。
-
AWSLambdaBasicExecutionRole
でフィルターし、その権限を選択して、ポリシーの添付をクリックします。 そのロールに対する完成した権限は、以下の例のようになります。
Lambda用レイヤーの作成¶
Lambdaはレイヤーを使用できます。レイヤーは、Lambdaが実行時に使用できる追加のライブラリです。
-
DataRobotアプリケーションのMLOpsエージェントライブラリをダウンロードします(プロファイル > 開発者ツールメニュー)。
備考
この例で使用するパッケージは
datarobot_mlops_package-6.3.3-488
です。 numpyとpandasも含まれています。 これらのパッケージはモデルのデータ準備にも使用され、Lambda関数でも同じコードが使用されています。 -
Lambda環境は、Amazon Linux上のPython 3.7です。 Lambdaでレイヤーが確実に機能するようにするには、まずAmazon Linux EC2インスタンスでレイヤーを作成します。 Amazon LinuxにPython 3をインストールする手順はこちらで利用できます。
-
モデルパッケージがサーバーに配置されたら、次の手順を実行します。
gunzip datarobot_mlops_package-6.3.3-488.tar.gz tar -xvf datarobot_mlops_package-6.3.3-488.tar cd datarobot_mlops_package-6.3.3 python3 -m venv my_agent/env source my_agent/env/bin/activate pip install lib/datarobot_mlops-*-py2.py3-none-any.whl deactivate cd my_agent/env mkdir -p python/lib/python3.7/site-packages cp -r lib/python3.7/site-packages/* python/lib/python3.7/site-packages/. zip -r9 ../agent_layer.zip python cd .. aws support s3 cp agent_layer.zip s3://some-bucket/layers/agent613_488_python37.zip
-
AWSで、Lambda > 追加リソース > レイヤーに移動します。
- レイヤーを作成を選択します。
- レイヤー
python37_agent633_488
に名前を付けて、オプションでPython 3.7ランタイムを選択します。 - S3からファイルをアップロードを選択し、ファイル
s3://some-bucket/layers/agent613_488_python37.zip
のS3アドレスを指定します。 - レイヤーを作成を選択し、設定を保存します。
Lambdaの作成¶
次の手順では、SageMakerランタイム呼び出しエンドポイントを呼び出すLambdaの作成方法を概説します。 エンドポイントは、すぐにスコアリングできる元のデータを受け入れます。 ただし、この方法はAPIクライアントには適していません。 次の例は、スコアリングの準備ができているレコードを示しています。
54,3,999,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,1,0,0,1,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0
AWSでの別の例はこちらにあります。 DataRobotでは、クライアントアプリケーションでデータ準備を実行することをお勧めします。
クライアントが使用する実際のデータを処理してスコアリングの準備を整えるLambdaを作成します。 返されたスコアもデコードされるため、アプリケーションの呼び出しがより簡単になります。 以下の手順を実行します。
- コンソールでAWS Lambdaサービスに移動します。
- 関数を作成をクリックします。
- 作成者 > 新規作成を選択します。
- Python 3.7ランタイムを選択します。
- 権限で、デフォルトの実行ロールに
lambda_sagemaker_execution_role
を選択します。 - 関数
lambda-direct-marketing
に名前を付けて、関数を作成を選択します。 - 次の画面で、必要に応じて環境変数を編集します。
- 必要に応じて、
MLOPS_DEPLOYMENT_ID
およびMLOPS_MODEL_ID
環境変数の値を置き換えます。 -
レポートチャネルとして使用するAWS SQSキューのURLを指定します。
名前 値 ENDPOINT_NAME xgboost-direct-marketing MLOPS_DEPLOYMENT_ID 1234567890 MLOPS_MODEL_ID 12345 MLOPS_OUTPUT_TYPE SQS MLOPS_SQS_QUEUE_URL https://sqs.us-east-1.amazonaws.com/1234567/sqs_mlops_data_queue レイヤーを選択する場所は、Lambdaデザイナーウィンドウにもあります。
-
このボックスを選択してから、レイヤーフォームからレイヤーを追加を選択します。
-
カスタムレイヤーを選択し、作成したレイヤーを選択します。
備考
このリストにはLambdaランタイムと一致するランタイムを持つレイヤーのみが表示されますが、レイヤー指定を選択した場合は、Amazonリソースネームでレイヤーを明示的に選択できます。
-
Lambda本体に次のコードを使用します。
import os
import io
import boto3
import json
import csv
import time
import pandas as pd
import numpy as np
from datarobot.mlops.mlops import MLOps
# grab environment variables
ENDPOINT_NAME = os.environ['ENDPOINT_NAME']
runtime= boto3.client('runtime.sagemaker')
def lambda_handler(event, context):
# this is designed to work with only one record, supplied as json
# start the clock
start_time = time.time()
# parse input data
print("Received event: " + json.dumps(event, indent=2))
parsed_event = json.loads(json.dumps(event))
payload_data = parsed_event['data']
data = pd.DataFrame(payload_data, index=[0])
input_data = data
# repeat data steps from training notebook
data['no_previous_contact'] = np.where(data['pdays'] == 999, 1, 0) # Indicator variable to capture when pdays takes a value of 999
data['not_working'] = np.where(np.in1d(data['job'], ['student', 'retired', 'unemployed']), 1, 0) # Indicator for individuals not actively employed
model_data = pd.get_dummies(data)
model_data = model_data.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)
# xgb sagemaker endpoint features
# order/type required as was deployed in sagemaker notebook
model_features = ['age', 'campaign', 'pdays', 'previous', 'no_previous_contact',
'not_working', 'job_admin.', 'job_blue-collar', 'job_entrepreneur',
'job_housemaid', 'job_management', 'job_retired', 'job_self-employed',
'job_services', 'job_student', 'job_technician', 'job_unemployed',
'job_unknown', 'marital_divorced', 'marital_married', 'marital_single',
'marital_unknown', 'education_basic.4y', 'education_basic.6y',
'education_basic.9y', 'education_high.school', 'education_illiterate',
'education_professional.course', 'education_university.degree',
'education_unknown', 'default_no', 'default_unknown', 'default_yes',
'housing_no', 'housing_unknown', 'housing_yes', 'loan_no',
'loan_unknown', 'loan_yes', 'contact_cellular', 'contact_telephone',
'month_apr', 'month_aug', 'month_dec', 'month_jul', 'month_jun',
'month_mar', 'month_may', 'month_nov', 'month_oct', 'month_sep',
'day_of_week_fri', 'day_of_week_mon', 'day_of_week_thu',
'day_of_week_tue', 'day_of_week_wed', 'poutcome_failure',
'poutcome_nonexistent', 'poutcome_success']
# create base generic single row to score with defaults
feature_dict = { i : 0 for i in model_features }
feature_dict['pdays'] = 999
# get column values from received and processed data
input_features = model_data.columns
# replace value in to be scored record, if input data provided a value
for feature in input_features:
if feature in feature_dict:
feature_dict[feature] = model_data[feature]
# make a csv string to score
payload = pd.DataFrame(feature_dict).to_csv(header=None, index=False).strip('\n').split('\n')[0]
print("payload is:" + str(payload))
# stamp for data prep
prep_time = time.time()
print('data prep took: ' + str(round((prep_time - start_time) * 1000, 1)) + 'ms')
response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
ContentType='text/csv',
Body=payload)
# process returned data
pred = json.loads(response['Body'].read().decode())
#pred = int(result['predictions'][0]['score'])
# if scored value is > 0.5, then return a 'yes' that the client will subscribe to a term deposit
predicted_label = 'yes' if pred >= 0.5 else 'no'
# initialize mlops monitor
m = MLOps().init()
# MLOPS: report test features and predictions and association_ids
m.report_predictions_data(
features_df=input_data
, class_names = ['yes', 'no']
, predictions = [[pred, 1-pred]] # yes, no
)
# report lambda timings (excluding lambda startup and imports...)
# MLOPS: report deployment metrics: number of predictions and execution time
end_time = time.time()
m.report_deployment_stats(1, (end_time - start_time) * 1000)
print("pred is: " + str(pred))
print("label is: " + str(predicted_label))
return predicted_label
Lambdaのテスト¶
-
Lambda画面の右上隅にあるテストを設定 > イベントをクリックして、テストJSONレコードを設定します。
-
次のJSONレコード形式を使用します。
{ "data": { "age": 56, "job": "housemaid", "marital": "married", "education": "basic.4y", "default": "no", "housing": "no", "loan": "no", "contact": "telephone", "month": "may", "day_of_week": "mon", "duration": 261, "campaign": 1, "pdays": 999, "previous": 0, "poutcome": "nonexistent", "emp.var.rate": 1.1, "cons.price.idx": 93.994, "cons.conf.idx": -36.4, "euribor3m": 4.857, "nr.employed": 5191 } }
-
テストを選択して、LambdaサービスとSageMakerエンドポイントを介してレコードをスコアリングします。
リソース設定とパフォーマンスに関する考慮事項¶
サーバーレスコンピューティングリソースには128MBから10240MBまで割り当てられます。これは、基本設定のLambdaコンソールで変更できます。これにより、各Lambdaの実行中に、部分的なvCPUが6つの完全なvCPUに割り当てられます。 Lambdaのコールドスタートとウォームスタート、およびSageMakerエンドポイントのEC2ホストのサイジングとスケーリングは、このトピックの範囲外になりますが、Lambda自体のリソースはスコアリング前後の処理と全体的なLambdaパフォーマンスに影響を及ぼします。
このコードに128MBを使用すると、処理時間が著しく遅くなりますが、RAMとCPOが大きくなるため、利益の減少が予想されます。 この例では、1706MB(および1つの完全なvCPU)を使用して、良好な結果が得られました。
API Gateway経由でのLambdaの公開¶
- AWSのAPI Gatewayサービスに移動し、APIを作成をクリックします。
- REST APIの構築を選択し、
lambda-direct-marketing-api
という名前を付けます。 - APIを作成を再度クリックします。
- エントリーの[リソース]セクションで、アクション -> リソースの作成を選択します。
predict
に名前を付けて、リソースを作成を選択します。- リソースを強調表示し、アクション -> メソッドを作成を選択し、POSTメソッドを選択します。
-
統合タイプ
Lambda Function
、Lambda関数lambda-direct-marketing
を選択したら、[保存]をクリックします。備考
Lambdaテストイベントで同じペイロードが使用された場合は、クライアントでテストボタンを選択できます。(Lambdaのテストを参照)。
-
次に、アクション > APIをデプロイを選択し、ステージ名(「test」など)を選択し、デプロイをクリックします。
モデルがデプロイされ、デプロイ後に提供される呼び出しURLを介して使用できるようになりました。
公開されたAPIのテスト¶
上記で使用したのと同じテストレコード(Lambdaのテスト)を使用して、HTTPリクエストを介してモデルをスコアリングできます。 以下は、curlとインラインJSONレコードを使用した例です。
想定されていない¶
curl -X POST " <https://tfff5ffffk6.execute-api.us-east-1.amazonaws.com/test/predict> " --data '{"data": {"age": 56,
"job": "housemaid", "marital": "married", "education": "basic.4y", "default": "no", "housing": "no", "loan": "no",
"contact": "telephone", "month": "may", "day\_of\_week": "mon", "duration": 261, "campaign": 1, "pdays": 999,
"previous": 0, "poutcome": "nonexistent", "emp.var.rate": 1.1, "cons.price.idx": 93.994, "cons.conf.idx": -36.4,
"euribor3m": 4.857, "nr.employed": 5191}}'
想定されている¶
curl -X POST " <https://tfff5ffffk6.execute-api.us-east-1.amazonaws.com/test/predict> " --data '{"data": {"age": 34,
"job": "blue-collar", "marital": "married", "education": "high.school", "default": "no", "housing": "yes", "loan": "no",
"contact": "cellular", "month": "may", "day\_of\_week": "tue", "duration": 863, "campaign": 1, "pdays": 3, "previous":
2, "poutcome": "success", "emp.var.rate": -1.8, "cons.price.idx": 92.893, "cons.conf.idx": -46.2, "euribor3m": 1.344,
"nr.employed": 5099.1}}'
DataRobotでのデプロイの確認と監視¶
データがデータキューからDataRobotに報告されると、外部モデルにはモデルとその予測に関連する指標が含まれます。 DataRobot UIからデプロイを選択して、運用サービスの正常性を表示できます。
また、スコアリングリクエストのデータを元のトレーニングセットのデータと比較するデータドリフト指標を表示することもできます。
DataRobotを使用して、独自のモデル(独自のリソースを使用するか、別の場所にデプロイ)を構築、ホスト、監視できるだけでなく、ここに示すように、外部アーキテクチャで作成およびホストされる完全にカスタムのモデルを監視するためにも使用できます。 未処理の特徴量のサービス正常性とドリフト追跡統計に加えて、関連付けIDと実際の結果を持つモデルを使用して、モデルの精度を追跡することもできます。