監視エージェントのユースケース¶
監視エージェントを適用する方法の例については、以下の監視ユースケースを参照してください。
- 大規模監視を有効にする
- 大規模なワークロードに対する高度なエージェントメモリーチューニングの実行
- チャレンジャーの精度をレポート
- 指標のレポート
- Sparkの環境のモニタリング
- MLOps CLIを使用したモニタリング
大規模監視を有効にする¶
大規模な監視をサポートするために、MLOpsライブラリは、クライアント側で元のデータから統計情報を計算する方法を提供します。 そして、DataRobot MLOpsサービスに元の特徴量と予測値を報告する代わりに、クライアントは特徴量と予測データを含まない匿名化された統計情報を報告することができます。 クライアント側で計算された予測データの統計情報を報告することは、元データの報告と比較して、特に大規模な場合(数十億行の特徴量と予測値)に最適(かつ高パフォーマンスな)方法です。 また、クライアント側の集計では、特徴量値の集計を送信するだけなので、実際の特徴量値を公開したくない環境に適しています。
大規模監視機能を有効にするには、特徴量型の設定のいずれかを行う必要があります。 この設定ではデータセットの特徴量型を指定しますが、コード内でプログラム的に(セッターを使用して)、または環境変数を定義することによって行います。
備考
コード内のプログラムおよび環境設定を定義してこれらの設定を設定する場合は、環境変数が優先されます。
大規模な監視を設定するには、以下の環境変数を使用します。
特徴量 | 説明 |
---|---|
MLOPS_FEATURE_TYPES_FILENAME |
JSON形式のデータセットの特徴量タイプを含むファイルへのパス。 例: "/tmp/feature_types.json" |
MLOPS_FEATURE_TYPES_JSON |
JSONにはデータセットの特徴量タイプが含まれます。 例: [{"name": "feature_name_f1","feature_type": "date", "format": "%m-%d-%y",}] |
オプション設定 | |
MLOPS_STATS_AGGREGATION_MAX_RECORDS |
集計するデータセット内のレコードの最大数。 例: 10000 |
MLOPS_STATS_AGGREGATION_PREDICTION_TS_COLUMN_NAME |
クライアント側で集計するデータセットの予測タイムスタンプ列名。 例: "ts" |
MLOPS_STATS_AGGREGATION_PREDICTION_TS_COLUMN_FORMAT |
データセット内の予測タイムスタンプ値の形式。 例: "%Y-%m-%d %H:%M:%S.%f" |
MLOPS_STATS_AGGREGATION_SEGMENT_ATTRIBUTES |
データドリフトと精度のセグメント化された分析用のデータセットをセグメント化するために使用するカスタム属性。 例: "country" |
MLOPS_STATS_AGGREGATION_AUTO_SAMPLING_PERCENTAGE |
アルゴリズムサンプリングを使用してDataRobotにレポートする元のデータの割合。 この設定は、元の特徴量、予測値、実測値のサンプルを提供することで、チャレンジャーモデルと精度の追跡をサポートします。 残りのデータは集計形式で送信されます。 さらに、サンプリングするデータを含む入力データの列を識別するようにMLOPS_ASSOCIATION_ID_COLUMN_NAME を定義する必要があります。 例: 20 |
MLOPS_ASSOCIATION_ID_COLUMN_NAME |
自動サンプリングと精度追跡に必要な関連付けIDを含む列。 例: `"rowID" |
次のコードスニペットは、大規模な監視設定をプログラムで設定する方法を示しています。
mlops = MLOps() \
.set_stats_aggregation_feature_types_filename("/tmp/feature_types.json") \
.set_aggregation_max_records(10000) \
.set_prediction_timestamp_column("ts", "yyyy-MM-dd HH:mm:ss") \
.set_segment_attributes("country") \
.set_auto_sampling_percentage(20) \
.set_association_id_column_name("rowID") \
.init()
mlops = MLOps() \
.set_stats_aggregation_feature_types_json([{"name": "feature_name_f1","feature_type": "date", "format": "%m-%d-%y",}]) \
.set_aggregation_max_records(10000) \
.set_prediction_timestamp_column("ts", "yyyy-MM-dd HH:mm:ss") \
.set_segment_attributes("country") \
.set_auto_sampling_percentage(20) \
.set_association_id_column_name("rowID") \
.init()
手動サンプリング
チャレンジャーモデルと精度監視をサポートするには、元の特徴量、予測値、実測値を送信する必要がありますが、自動サンプリング機能を使用する必要はありません。 元のデータの小さいサンプルを手動でレポートし、残りのデータを集計形式で送信できます。
予測のタイムスタンプ
MLOPS_STATS_AGGREGATION_PREDICTION_TS_COLUMN_NAME
とMLOPS_STATS_AGGREGATION_PREDICTION_TS_COLUMN_FORMAT
の環境変数を指定しない場合、現在の現地時間に基づいてタイムスタンプが生成されます。
大規模な監視機能は、Python、Javaソフトウェア開発キット(SDK)、およびMLOps Spark Utilsライブラリで利用できます。
report_predictions_data()
への呼び出しを次の呼び出しに置き換えます。
report_aggregated_predictions_data(
self,
features_df,
predictions,
class_names,
deployment_id,
model_id
)
reportPredictionsData()
への呼び出しを次の呼び出しに置き換えます。
reportAggregatePredictionsData(
Map<String,
List<Object>> featureData,
List<?> predictions,
List<String> classNames
)
reportPredictions()
の呼び出しを predictionStatisticsParameters.report()
の呼び出しに置き換えます。
predictionStatisticsParameters.report()
関数には、次のビルダーコンストラクタがあります。
PredictionStatisticsParameters.Builder()
.setChannelConfig(channelConfig)
.setFeatureTypes(featureTypes)
.setDataFrame(df)
.build();
predictionStatisticsParameters.report();
ヒント
このユースケースの例は、エージェント.tar
ファイルの examples/java/PredictionStatsSparkUtilsExample
にあります。
サポートされている特徴量タイプのマッピング¶
現在、大規模な監視では、数値およびカテゴリーの特徴量がサポートされています。 この監視方法を設定する際には、各特徴量名を対応する特徴量タイプ(数値またはカテゴリー)にマッピングする必要があります。 特徴量タイプを特徴量名にマッピングする場合、スコアリングコードモデル用のメソッドと、それ以外のすべてのモデル用のメソッドがあります。
多くの場合、モデルは既存のアクセス方法を使用して特徴量名と特徴量タイプを出力できます。ただし、アクセスできない場合は、集約する各特徴量を手動でNumeric
またはCategorical
に分類する必要があります。
predictionStatisticsParameters
のsetFeatureTypes
メソッドを使用して、特徴量タイプ(Numeric
またはCategorical
)を各特徴量名にマッピングします。
getFeatures
クエリーを使用してPredictor
オブジェクトで特徴量を取得した後、特徴量タイプ(Numeric
またはCategorical
)を各特徴量名にマッピングします。
このユースケースの例は、エージェント.tar
ファイルの examples/java/PredictionStatsSparkUtilsExample/src/main/scala/com/datarobot/dr_mlops_spark/Main.scala
にあります。
大規模なワークロードに対する高度なエージェントメモリーチューニングの実行¶
監視エージェントのデフォルト設定は、平均的なワークロードでうまく機能するように調整されています。ただし、DataRobot MLOpsに転送するためにエージェントがグループ化するレコード数を増やすと、エージェントの総メモリー使用量がワークロードの増加に対応するために着実に増加します。 エージェントがユースケースのワークロードに確実に対応できるようにするには、エージェントの総メモリー使用量を見積もった後、エージェントのメモリー割り当て量を設定するか、最大レコードグループのサイズを設定します。
エージェントでのメモリー使用量の見積もり¶
監視エージェントのおおよそのメモリー使用量(バイト単位)を見積もるときは、報告される各特徴量が平均10 bytes
のメモリーを必要とすることを想定してください。 次に、報告された特徴量数(num_samples
で表示)とサンプル数(num_features
で表示)から生の予測データを含む各メッセージのメモリー使用量を推定できます。 各メッセージは約10 bytes × num_features × num_samples
のメモリーを使用します。
備考
報告された特徴量ごとの10バイトのメモリー見積もり量は、バランスの取れた特徴量の組み合わせを含むデータセットに最も適していると考えてください。 テキスト特徴量が大きくなる傾向があるため、平均量以上のテキスト特徴量を含むデータセットは、特徴量ごとにより多くのメモリーを使用する傾向があります。
一度に多数のレコードをグループ化する場合は、エージェントがagentMaxAggregatedRecords
設定で設定された上限に達するまでメッセージをグループ化することを考慮に入れてください。 さらに、その時点で、エージェントはhttpConcurrentRequest
設定で上限設定されたメモリーにメッセージを保持します。
上記の計算を組み合わせて、次の式でエージェントのメモリー使用量 (および必要なメモリー割り当て量)を見積もることができます。
memory_allocation = 10 bytes × num_features × num_samples × max_group_size × max_concurrency
変数は次のように定義されています。
-
num_features
:データセット内の特徴量(列)数。 -
num_samples
:MLOpsレポート関数の1回の呼び出しで報告される行数。 -
max_group_size
: エージェント設定ファイルのagentMaxAggregatedRecords
によって設定された各HTTPリクエストに集約されたレコード数。 -
max_concurrency
: エージェント設定ファイルのhttpConcurrentRequest
によって設定された同時HTTPリクエスト数。
上記のデータセットとエージェント設定情報を使用して必要なエージェントのメモリー割り当てを計算すると、エージェント設定をファインチューニングして、パフォーマンスとメモリー使用のバランスを最適化するのにこの情報が役に立ちます。
エージェントのメモリー割り当ての設定¶
ユースケースに対するエージェントのメモリー要件がわかったら、MLOPS_AGENT_JVM_OPT
環境変数を使用して、エージェントのJava仮想マシン(JVM)のメモリー割り当て量を増やすことができます。
MLOPS_AGENT_JVM_OPT=-Xmx2G
重要
コンテナまたはVMでエージェントを実行する場合は、-Xmx
設定よりも25%以上のメモリーでシステムを構成する必要があります。
グループの最大サイズの設定¶
または、ユースケースのエージェントのメモリー要件を減らすために、 エージェント設定ファイルのagentMaxAggregatedRecords
によって設定された、エージェントの最大グループサイズ制限を減らすことができます。
# Maximum number of records to group together before sending to DataRobot MLOps
agentMaxAggregatedRecords: 10
この設定を1
に下げると、エージェントによるレコードのグループ化が無効になります。
チャレンジャーの精度をレポート¶
DataRobotのデプロイに関連付けIDセットがない場合でも、エージェントが関連付けIDで予測を送信する可能性があるため、エージェントのAPIエンドポイントは、__DataRobot_Internal_Association_ID__
列をデプロイの関連付けIDとして認識します。 これにより、エージェントを使って予測データをレポートするモデルの精度追跡が有効になります。ただし、チャレンジャーモデルで予測が再生された場合、DataRobotでは__DataRobot_Internal_Association_ID__
列を使用できません。つまり、関連付けIDがないため、DataRobot MLOpsはこれらのモデルの精度を記録できません。 エージェントを使ってチャレンジャーモデルの精度を追跡するには、 デプロイの作成中または 精度設定で、デプロイの関連付けIDを__DataRobot_Internal_Association_ID__
に設定します。 この値で関連付けIDを設定すると、将来のすべてのチャレンジャー再生でチャレンジャーモデルの精度が記録されます。
指標のレポート¶
予測環境をDataRobotにネットワーク接続できない場合、代わりに監視エージェントのレポートをオフラインで使用できます。
-
予測環境で、
filesystem
スプーラータイプを使用するように、MLOpsライブラリを設定します。 MLOpsライブラリは、/disconnected/predictions_dir
などの設定されたディレクトリに指標を報告します。 -
DataRobotにネットワーク接続されているマシンで監視エージェントを実行します。
-
FILESYSTEMスプーラータイプを使用し、ローカルディレクトリから入力を受け取るようにエージェントを設定します。例:
/connected/predictions_dir
。 -
ディレクトリの内容を
/disconnected/predictions_dir
接続先の環境に移行します/connected/predictions_dir
。
スコアリングコードモデルのレポート¶
監視エージェントのレポートを使用して、ダウンロードしたスコアリングコードモデルの監視指標をDataRobotに送信することもできます。 このユースケースの例を、examples/java/CodeGenExample
のMLOpsエージェントtarballで参照してください。
Sparkの環境のモニタリング¶
監視エージェントの一般的なユースケースは、Spark環境でのスコアリングのモニタリングで、スコアリングはSparkで行われ、予測と特徴量をDataRobotにレポートしたいと考えています。 通常、Sparkは複数ノードセットアップを使用するため、共有の一貫性のあるファイルシステムはSparkのインストールでは一般的ではなく、エージェントのfileystem
スプーラーチャネルを使用することは困難です。
これを回避するには、RabbitMQまたはAWS SQSのようなネットワークベースのチャネルを使用します。 これらのチャネルは、複数のライターおよび単一(または複数)のリーダーと連携できます。
以下の例では、MLOps Spark Utilモジュールを使用して、Sparkシステム上でエージェント監視を設定する方法を説明し、Sparkフレームワークで行われたスコアリング結果を報告する方法を提供します。 MLOpsのJavaサンプルディレクトリexamples/java/SparkUtilsExample/
にあるMLOpsSparkUtilsモジュールのドキュメントを参照してください。
Sparkサンプルのソースコードは、次の3つのステップを実行します。
- スコアリングJARファイルを指定すると、データをスコアリングし、データフレームで結果を提供します。
- 特徴量のデータフレームと予測結果を単一のデータフレームに統合します。
mlops_spark_utils.MLOpsSparkUtils.reportPredictions
ヘルパーを呼び出し、マージされたデータフレームを使用して予測をレポートします。
関数がデータフレームを介してデータを取得する限り、任意のモデルで生成された予測をmlops_spark_utils.MLOpsSparkUtils.reportPredictions
を使用してレポートできます。
通信チャネルとしてRabbitMQを使用したこの例には、チャネル設定が含まれています。 Sparkは分散フレームワークであるため、DataRobotではワーカーが実行されているノードに関係なく、Sparkワーカーが監視データを同じチャネルに送信できるようにするために、RabbitMQやAWS SQSなどのネットワークベースのチャネルを必要とします。
Sparkの前提条件¶
次の手順は、Sparkの監視ユースケースを実行するために必要な前提条件の概要を示しています。
-
コンテナ内でスプーラー(この例ではRabbitMQ)を実行します。
- このコマンドは、RabbitMQの管理コンソールも実行します。
- ブラウザーからhttp://localhost:15672(ユーザー名 =
guest
、パスワード =guest
)でコンソールにアクセスできます。 -
コンソールで、以下の
./run_example.sh
スクリプトを実行したときのメッセージキューの動作を確認します。docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq-spark-example rabbitmq:3-management
-
監視エージェントを設定して開始します。
- エージェントtarballで提供されるクイックスタートガイドに従ってください。
- RabbitMQと通信するようにエージェントを設定します。
-
以下に一致するようにエージェントチャネル設定を編集します。
- type: "RABBITMQ_SPOOL" details: {name: "rabbit", queueUrl: "amqp://localhost:5672", queueName: "spark_example"}
-
mvnを使用する場合は、サンプルをテストする前にを実行して、
datarobot-mlops
JARをローカルのmvnリポジトリにインストールします。./examples/java/install_jar_into_maven.sh
このコマンドは、シェルスクリプトを実行して、使用しているSparkバージョン(
<version>
はエージェントバージョンを表す)に応じて、mlops-utils-for-spark_2-<version>.jar
またはmlops-utils-for-spark_3-<version>.jar
ファイルのいずれかをインストールします。 -
サンプルのJARファイルを作成し、
JAVA_HOME
環境変数を設定してから、make
を実行してコンパイルします。- Spark2/Java8の場合:
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
- Spark3/Java11の場合:
export JAVA_HOME=$(/usr/libexec/java_home -v 11)
- Spark2/Java8の場合:
-
Sparkをローカルにインストールして実行します。
-
Hadoop 3.3+用に構築されたSpark 2(2.x.x)またはSpark 3(3.x.x)の最新バージョンをダウンロードします。 最新のSpark 3バージョンをダウンロードするには、 Apache Sparkのダウンロードページを参照してください。
備考
以下のコマンドおよびディレクトリの
<version>
プレースホルダーを、使用しているSparkおよびHadoopのバージョンに置き換えます。 -
tarball(
tar xvf ~/Downloads/spark-3.2.<version>-bin-hadoop<version>.tgz
)を解凍します。 -
spark-<version>-bin-hadoop<version>
ディレクトリで、Sparkクラスターを起動します。sbin/start-master.sh -i localhost` sbin/start-slave.sh -i localhost -c 8 -m 2G spark://localhost:7077
-
インストールが成功したことを確認します。
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples_*.jar 10
-
Sparkビンディレクトリを
$PATH
に追加します。env SPARK_BIN=/opt/ml/spark-3.2.1-bin-hadoop3.*/bin ./run_example.sh
備考
監視エージェントはSpark2もサポートしています。
-
Sparkのユースケース¶
上記の前提条件を満たした後、Sparkのサンプルを実行します。
-
モデルパッケージを作成し、デプロイを初期化します。
./create_deployment.sh
DataRobot UIを使用して、外部モデルパッケージを作成し、デプロイすることもできます。
-
デプロイの環境変数と、デプロイの作成時に返されたモデルを、コピーしてシェルに貼り付けることにより設定します。
export MLOPS_DEPLOYMENT_ID=<deployment_id> export MLOPS_MODEL_ID=<model_id>
-
予測を生成し、統計をDataRobotにレポートします。
run_example.sh
-
スプーラータイプ(Sparkジョブと監視エージェント間の通信チャネル)を変更する場合:
src/main/scala/com/datarobot/dr_mlops_spark/Main.scala
にあるScalaコードを編集します。-
次の行を変更して、必要なチャネル設定を含めます。
val channelConfig = "output_type=rabbitmq;rabbitmq_url=amqp://localhost;rabbitmq_queue_name=spark_example"
-
make
を実行してコードを再コンパイルします。
MLOps CLIを使用したモニタリング¶
MLOpsは、MLOpsアプリケーションと対話するためのコマンドラインインターフェイス(CLI)をサポートしています。 CLIは、デプロイとモデルパッケージの作成、データセットのアップロード、予測と実測に関する指標のレポートなど、ほとんどのMLOpsアクションに使用できます。
使用可能な操作と構文例のリストについては、MLOps CLIヘルプページを使用してください。
mlops-cli [-h]
監視エージェントとMLOps CLIの比較
監視エージェントと同様に、MLOps CLIも予測データをMLOpsサービスに送信できますが、その使用法は少し異なります。 MLOps CLIは、現在のスプールファイルの内容を使用してMLOpsサービスにHTTPリクエストを送信できるPythonアプリです。 内部で監視エージェントを実行したり、Javaプロセスを呼び出したりすることはありません。また継続的にポーリングしたり、新しいスプールデータを待機したりすることもありません。既存のスプールデータが消費されると、終了します。 一方、監視エージェントはJavaプロセスで、実行中はスプールファイル内の新しいデータを継続的にポーリングし、そのデータを最適化された形式でMLOpsサービスに送信します。