Apache Airflow用のDataRobotプロバイダー¶
DataRobot MLOpsと Apache Airflowの機能を組み合わせることで、モデルの再トレーニングと再デプロイに信頼性の高いソリューションが提供されます。 たとえば、スケジュール、モデルのパフォーマンス低下、または新しいデータがあるときにパイプラインをトリガーするセンサーを使用して、モデルを再トレーニングおよび再デプロイできます。 Apache Airflow用のDataRobotプロバイダーに関するこのクイックスタートガイドでは、基本的な Apache Airflow DAG(有向非巡回グラフ)を実装して、エンドツーエンドのDataRobot AIパイプラインを調整することによるセットアップと設定のプロセスを説明します。 このパイプラインには、プロジェクトの作成、モデルのトレーニング、モデルのデプロイ、予測のスコアリング、ターゲットと特徴量ドリフトデータを返すことなどが含まれます。 さらに、このガイドでは、さまざまなDataRobotパイプラインを迅速に実装できるように、airflow-provider-datarobot
リポジトリから サンプルDAGファイルをインポートする方法も説明します。
Apache Airflow用のDataRobotプロバイダーは、 GitHubのパブリックリポジトリで利用可能なソースコードから構築されたPythonパッケージであり、 PyPi(Pythonパッケージインデックス)で公開されています。 これは、 Astronomerレジストリにも一覧表示されています。 プロバイダーパッケージの使用と開発の詳細については、 Apache Airflowのドキュメントを参照してください。 この連携では、REST APIを介してDataRobotインスタンスと通信する DataRobot Python APIクライアントが使用されます。 詳細については、 DataRobot Pythonパッケージのドキュメントを参照してください。
必須コンポーネントのインストール¶
Apache Airflow用のDataRobotプロバイダーには、以下の依存関係がインストールされた環境が必要です。
-
Apache Airflow >= 2.3
-
Python APIクライアント >= 3.2.0b1
DataRobotプロバイダーをインストールするには、次のコマンドを実行します。
pip install airflow-provider-datarobot
開始する前に、ローカルのAirflowインスタンスを管理するための、 Astronomerコマンドラインインターフェイス(CLI)ツールをインストールします。
まず、 Linux用のDocker Desktopをインストールします。
次に、以下のコマンドを実行します。
curl -sSL https://install.astronomer.io | sudo bash
まず、 Windows用のDocker Desktopをインストールします。
次に、 Astro CLI READMEを参照してください。
次に、 pyenvまたは他のPythonバージョンマネージャーをインストールします。
ローカルAirflowプロジェクトの初期化¶
インストールの前提条件が完了したら、新しいディレクトリを作成し、 AstroCLIを使用してローカルのAirflowプロジェクトを初期化できます。
-
新しいディレクトリを作成し、そのディレクトリに移動します。
mkdir airflow-provider-datarobot && cd airflow-provider-datarobot
-
新しいディレクトリ内で次のコマンドを実行し、必要なファイルを含む新しいプロジェクトを初期化します。
astro dev init
-
requirements.txt
ファイルに移動し、次のコンテンツを追加します。airflow-provider-datarobot
-
次のコマンドを実行して、DockerコンテナでローカルのAirflowインスタンスを開始します。
astro dev start
-
インストールが完了し、Webサーバーが起動すると(約1分後)、
http://localhost:8080/
でAirflowにアクセスできるようになります。 -
Airflowにサインインします。 Airflow DAGページが表示されます。
サンプルDAGをAirflowに読み込む¶
DAGの例は、デフォルトでは****DAGページには表示されません。 Apache AirflowのサンプルDAG用のDataRobotプロバイダーを使用できるようにするには:
-
DAGファイルを airflow-provider-datarobotリポジトリからダウンロードします。
-
datarobot_pipeline_dag.py
Airflow DAG(またはdatarobot_provider/example_dags
ディレクトリ全体)をプロジェクトにコピーします。 -
1~2分待ってページを更新します。
サンプルDAGは、datarobot_pipeline DAGを含むDAGページに表示されます。
AirflowからDataRobotへの接続の作成¶
次のステップでは、AirflowからDataRobotへの接続を作成します。
-
管理 > 接続をクリックして、 Airflow接続を追加します。
-
リスト接続ページで、+ 新しいレコードを追加をクリックします。
-
接続を追加ダイアログボックスで、次のフィールドを設定します。
フィールド 説明 接続ID datarobot_default
(すべての演算子でこの名前をデフォルトで使用)接続タイプ DataRobot APIキー DataRobot APIトークン(開発者ツールでAPIキーを配置または作成) DataRobotエンドポイントのURL デフォルトでは https://app.jp.datarobot.com/api/v2
-
テストをクリックして、AirflowとDataRobotの間にテスト接続を確立します。
-
接続テストが成功したら、保存をクリックします。
DataRobotパイプラインDAGの設定¶
datarobot_pipeline Airflow DAGには、DataRobotパイプラインのステップを自動化する演算子とセンサーが含まれています。 各演算子は特定のジョブを開始し、各センサーは所定のアクションが完了するまで待機します。
オペレーター | ジョブ |
---|---|
CreateProjectOperator | DataRobotプロジェクトを作成し、そのIDを返します |
TrainModelsOperator | DataRobotのオートパイロットを起動して、モデルをトレーニングします |
DeployModelOperator | 指定モデルをデプロイし、デプロイIDを返します |
DeployRecommendedModelOperator | 推奨モデルをデプロイし、デプロイIDを返します |
ScorePredictionsOperator | デプロイに対して予測をスコアリングし、バッチ予測ジョブIDを返します |
AutopilotCompleteSensor | オートパイロットが完了したかどうかを感知します |
ScoringCompleteSensor | バッチスコアリングが完了したかどうかを感知します |
GetTargetDriftOperator | デプロイからターゲットドリフトを返します |
GetFeatureDriftOperator | デプロイから特徴量ドリフトを返します |
DataRobotパイプラインの各演算子には、特定のパラメーターが必要です。 これらのパラメーターを設定JSONファイルで定義し、DAGの実行時にJSONを指定します。
{
"training_data": "local-path-to-training-data-or-s3-presigned-url-",
"project_name": "Project created from Airflow",
"autopilot_settings": {
"target": "readmitted",
"mode": "quick",
"max_wait": 3600
},
"deployment_label": "Deployment created from Airflow",
"score_settings": {}
}
autopilot_settings
のパラメーターは Project.set_target()
メソッドに直接渡されます。このメソッドで使用可能なパラメーターは、設定JSONファイルを通じて設定することができます。
training_data
およびscore_settings
の値は、入力/出力タイプによって異なります。 score_settings
のパラメーターは BatchPredictionJob.score()
メソッドに直接渡されます。このメソッドで使用可能なパラメーターは、設定JSONファイルを通じて設定することができます。
たとえば、以下のローカルファイル入出力およびAmazon AWS S3入出力JSON設定のサンプルを参照してください。
training_data
の定義
ローカルファイル入力の場合、training_data
へのローカルパスを指定する必要があります。
1 2 3 4 5 6 7 8 9 10 11 |
|
score_settings
の定義
スコアリングintake_settings
およびoutput_settings
では、type
を定義し、ローカルpath
を入出力データの場所に指定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
備考
Astro CLIツールを使用してAirflowを実行する場合、ローカル入力ファイルをinclude/
ディレクトリに配置できます。 この場所は、Dockerコンテナ内のAirflowアプリケーションからアクセスできます。
training_data
の定義
Amazon AWS S3入力の場合、S3のトレーニングデータファイル用の事前署名済みURLを生成できます。
-
S3バケットでCSVファイルをクリックします。
-
画面の右上にあるオブジェクトアクションをクリックし、事前署名済みURLで共有をクリックします。
-
有効期限の間隔を設定し、事前署名済みURLを作成をクリックします。 URLはクリップボードに保存されます。
-
URLをJSON設定ファイルに
training_data
値として貼り付けます。
1 2 3 4 5 6 7 8 9 10 11 12 |
|
datarobot_aws_credentials
とscore_settings
の定義
Amazon AWS S3でのスコアリングデータの場合、DataRobot AWSの資格情報をAirflowに追加できます。
-
管理 > 接続をクリックして、 Airflow接続を追加します。
-
リスト接続ページで、+ 新しいレコードを追加をクリックします。
-
接続タイプリストで、DataRobot AWSの資格情報をクリックします。
-
接続IDを定義し、Amazon AWS S3の資格情報を入力します。
-
テストをクリックして、AirflowとAmazon AWS S3の間にテスト接続を確立します。
-
接続テストが成功したら、保存をクリックします。
リスト接続ページに戻って、接続IDをコピーする必要があります。
DAGの実行時に、接続ID/接続ID値(この例ではconnection-id
で表示)を、datarobot_aws_credentials
フィールドに追加できるようになりました。
スコアリングintake_settings
およびoutput_settings
では、type
を定義し、AWS S3入出力データの場所にurl
を指定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
備考
このパイプラインはデプロイを作成するので、デプロイ作成ステップの出力により、スコアリングに必要なdeployment_id
が提供されます。
DataRobotパイプラインDAGの実行¶
上記の設定手順を完了すると、組み立てた設定JSONを使用して、AirflowでDataRobotプロバイダーDAGを実行できます。
-
Airflow DAGページで、実行するDAGパイプラインを見つけます。
-
そのDAGの実行アイコンをクリックし、設定でDAGをトリガーをクリックします。
-
DAG confパラメーターページで、DAGに必要なJSON設定データを入力します。 この例では、前の手順で組み立てたJSONになります。
-
トリガー時にDAGの一時停止を解除を選択し、トリガーをクリックします。 DAGが実行を開始します。
備考
DockerコンテナでAirflowを実行している間(Astro CLIツールを使用する場合など)、コンテナ内で予測ファイルが作成されることが予想されます。 ホストマシンで予測を使用できるようにするには、include/
ディレクトリ内の出力場所を指定します。