Skip to content

Apache Airflow用のDataRobotプロバイダー

DataRobot MLOpsApache Airflowの機能を組み合わせることで、モデルの再トレーニングと再デプロイに信頼性の高いソリューションが提供されます。 たとえば、スケジュール、モデルのパフォーマンス低下、または新しいデータがあるときにパイプラインをトリガーするセンサーを使用して、モデルを再トレーニングおよび再デプロイできます。 This quickstart guide on the DataRobot provider for Apache Airflow illustrates the setup and configuration process by implementing a basic Apache Airflow DAG (Directed Acyclic Graph) to orchestrate an end-to-end DataRobot AI pipeline. このパイプラインには、プロジェクトの作成、モデルのトレーニング、モデルのデプロイ、予測のスコアリング、ターゲットと特徴量ドリフトデータを返すことなどが含まれます。 In addition, this guide shows you how to import example DAG files from the airflow-provider-datarobot repository so that you can quickly implement a variety of DataRobot pipelines.

Apache Airflow用のDataRobotプロバイダーは、 GitHubのパブリックリポジトリで利用可能なソースコードから構築されたPythonパッケージであり、 PyPI(Pythonパッケージインデックス)で公開されています。 It is also listed in the Astronomer Registry. For more information on using and developing provider packages, see the Apache Airflow documentation. The integration uses the DataRobot Python API Client, which communicates with DataRobot instances via REST API. For more information, see the DataRobot Python package documentation.

必須コンポーネントのインストール

Apache Airflow用のDataRobotプロバイダーには、以下の依存関係がインストールされた環境が必要です。

DataRobotプロバイダーをインストールするには、次のコマンドを実行します。

pip install airflow-provider-datarobot 

開始する前に、ローカルのAirflowインスタンスを管理するための、 Astronomerコマンドラインインターフェイス(CLI)ツールをインストールします。

まず、 MacOS用のDocker Desktopをインストールします。

次に、以下のコマンドを実行します。

brew install astro 

まず、 Linux用のDocker Desktopをインストールします。

次に、以下のコマンドを実行します。

curl -sSL https://install.astronomer.io | sudo bash 

まず、 Windows用のDocker Desktopをインストールします。

次に、 Astro CLI READMEを参照してください。

次に、 pyenvまたは他のPythonバージョンマネージャーをインストールします。

ローカルAirflowプロジェクトの初期化

インストールの前提条件が完了したら、新しいディレクトリを作成し、 AstroCLIを使用してローカルのAirflowプロジェクトを初期化できます。

  1. 新しいディレクトリを作成し、そのディレクトリに移動します。

    mkdir airflow-provider-datarobot && cd airflow-provider-datarobot 
    
  2. 新しいディレクトリ内で次のコマンドを実行し、必要なファイルを含む新しいプロジェクトを初期化します。

    astro dev init 
    
  3. requirements.txtファイルに移動し、次のコンテンツを追加します。

    airflow-provider-datarobot 
    
  4. 次のコマンドを実行して、DockerコンテナでローカルのAirflowインスタンスを開始します。

    astro dev start 
    
  5. インストールが完了し、Webサーバーが起動すると(約1分後)、http://localhost:8080/でAirflowにアクセスできるようになります。

  6. Airflowにサインインします。 Airflow DAGページが表示されます。

サンプルDAGをAirflowに読み込む

DAGの例は、デフォルトでは****DAGページには表示されません。 Apache AirflowのサンプルDAG用のDataRobotプロバイダーを使用できるようにするには:

  1. DAGファイルを airflow-provider-datarobotリポジトリからダウンロードします。

  2. datarobot_pipeline_dag.pyAirflow DAG(またはdatarobot_provider/example_dagsディレクトリ全体)をプロジェクトにコピーします。

  3. 1~2分待ってページを更新します。

    サンプルDAGは、datarobot_pipeline DAGを含むDAGページに表示されます。

AirflowからDataRobotへの接続の作成

次のステップでは、AirflowからDataRobotへの接続を作成します。

  1. 管理 > 接続をクリックして、 Airflow接続を追加します。

  2. リスト接続ページで、+ 新しいレコードを追加をクリックします。

  3. 接続を追加ダイアログボックスで、次のフィールドを設定します。

    フィールド 説明
    接続ID datarobot_default(すべての演算子でこの名前をデフォルトで使用)
    接続タイプ DataRobot
    APIキー A DataRobot API token (locate or create an API key in API keys and tools)
    DataRobotエンドポイントのURL デフォルトではhttps://app.datarobot.com/api/v2
  4. テストをクリックして、AirflowとDataRobotの間にテスト接続を確立します。

  5. 接続テストが成功したら、保存をクリックします。

DataRobotパイプラインDAGの設定

datarobot_pipeline Airflow DAGには、DataRobotパイプラインのステップを自動化する演算子とセンサーが含まれています。 各演算子は特定のジョブを開始し、各センサーは所定のアクションが完了するまで待機します。

オペレーター ジョブ
CreateProjectOperator DataRobotプロジェクトを作成し、そのIDを返します
TrainModelsOperator DataRobotのオートパイロットを起動して、モデルをトレーニングします
DeployModelOperator 指定モデルをデプロイし、デプロイIDを返します
DeployRecommendedModelOperator 推奨モデルをデプロイし、デプロイIDを返します
ScorePredictionsOperator デプロイに対して予測をスコアリングし、バッチ予測ジョブIDを返します
AutopilotCompleteSensor オートパイロットが完了したかどうかを感知します
ScoringCompleteSensor バッチスコアリングが完了したかどうかを感知します
GetTargetDriftOperator デプロイからターゲットドリフトを返します
GetFeatureDriftOperator デプロイから特徴量ドリフトを返します

備考

このパイプラインの例では、使用可能なすべての演算子またはセンサーが使用されるわけではありません。詳細については、プロジェクトのREADME 演算子センサーのドキュメントを参照してください。

DataRobotパイプラインの各演算子には、特定のパラメーターが必要です。 これらのパラメーターを設定JSONファイルで定義し、DAGの実行時にJSONを指定します。

{
    "training_data": "local-path-to-training-data-or-s3-presigned-url-",
    "project_name": "Project created from Airflow",
    "autopilot_settings": {
        "target": "readmitted",
        "mode": "quick",
        "max_wait": 3600
    },
    "deployment_label": "Deployment created from Airflow",
    "score_settings": {}
} 

autopilot_settingsのパラメーターは Project.set_target()メソッドに直接渡されます。このメソッドで使用可能なパラメーターは、設定JSONファイルを通じて設定することができます。

training_dataおよびscore_settingsの値は、入力/出力タイプによって異なります。 The parameters from score_settings are passed directly into the BatchPredictionJob.score() method; you can set any parameter available in this method through the configuration JSON file.

たとえば、以下のローカルファイル入出力およびAmazon AWS S3入出力JSON設定のサンプルを参照してください。

training_dataの定義

ローカルファイル入力の場合、training_dataへのローカルパスを指定する必要があります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "training_data": "include/Diabetes10k.csv",
    "project_name": "Project created from Airflow",
    "autopilot_settings": {
        "target": "readmitted",
        "mode": "quick",
        "max_wait": 3600
    },
    "deployment_label": "Deployment created from Airflow",
    "score_settings": {}
} 

score_settingsの定義

スコアリングintake_settingsおよびoutput_settingsでは、typeを定義し、ローカルpathを入出力データの場所に指定します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
    "training_data": "include/Diabetes10k.csv",
    "project_name": "Project created from Airflow",
    "autopilot_settings": {
        "target": "readmitted",
        "mode": "quick",
        "max_wait": 3600
    },
    "deployment_label": "Deployment created from Airflow",
    "score_settings": {
        "intake_settings": {
            "type": "localFile",
            "file": "include/Diabetes_scoring_data.csv"
        },
        "output_settings": {
            "type": "localFile",
            "path": "include/Diabetes_predictions.csv"
        }
    }
} 

備考

Astro CLIツールを使用してAirflowを実行する場合、ローカル入力ファイルをinclude/ディレクトリに配置できます。 この場所は、Dockerコンテナ内のAirflowアプリケーションからアクセスできます。

training_dataの定義

Amazon AWS S3入力の場合、S3のトレーニングデータファイル用の事前署名済みURLを生成できます。

  1. S3バケットでCSVファイルをクリックします。

  2. 画面の右上にあるオブジェクトアクションをクリックし、事前署名済みURLで共有をクリックします。

  3. 有効期限の間隔を設定し、事前署名済みURLを作成をクリックします。 URLはクリップボードに保存されます。

  4. URLをJSON設定ファイルにtraining_data値として貼り付けます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
    "training_data": "s3-presigned-url",
    "project_name": "Project created from Airflow",
    "autopilot_settings": {
        "target": "readmitted",
        "mode": "quick",
        "max_wait": 3600
    },
    "deployment_label": "Deployment created from Airflow",
    "datarobot_aws_credentials": "connection-id",
    "score_settings": {}
} 

datarobot_aws_credentialsscore_settingsの定義

Amazon AWS S3でのスコアリングデータの場合、DataRobot AWSの資格情報をAirflowに追加できます。

  1. 管理 > 接続をクリックして、 Airflow接続を追加します。

  2. リスト接続ページで、+ 新しいレコードを追加をクリックします。

  3. 接続タイプリストで、DataRobot AWSの資格情報をクリックします。

  4. 接続IDを定義し、Amazon AWS S3の資格情報を入力します。

  5. テストをクリックして、AirflowとAmazon AWS S3の間にテスト接続を確立します。

  6. 接続テストが成功したら、保存をクリックします。

    リスト接続ページに戻って、接続IDをコピーする必要があります。

DAGの実行時に、接続ID/接続ID値(この例ではconnection-idで表示)を、datarobot_aws_credentialsフィールドに追加できるようになりました。

スコアリングintake_settingsおよびoutput_settingsでは、typeを定義し、AWS S3入出力データの場所にurlを指定します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
    "training_data": "s3-presigned-url",
    "project_name": "Project created from Airflow",
    "autopilot_settings": {
        "target": "readmitted",
        "mode": "quick",
        "max_wait": 3600
    },
    "deployment_label": "Deployment created from Airflow",
    "datarobot_aws_credentials": "connection-id",
    "score_settings": {
        "intake_settings": {
            "type": "s3",
            "url": "s3://path/to/scoring-data/Diabetes10k.csv",
        },
        "output_settings": {
            "type": "s3",
            "url": "s3://path/to/results-dir/Diabetes10k_predictions.csv",
        }
    }
} 

備考

このパイプラインはデプロイを作成するので、デプロイ作成ステップの出力により、スコアリングに必要なdeployment_idが提供されます。

DataRobotパイプラインDAGの実行

上記の設定手順を完了すると、組み立てた設定JSONを使用して、AirflowでDataRobotプロバイダーDAGを実行できます。

  1. Airflow DAGページで、実行するDAGパイプラインを見つけます。

  2. そのDAGの実行アイコンをクリックし、設定でDAGをトリガーをクリックします。

  3. DAG confパラメーターページで、DAGに必要なJSON設定データを入力します。 この例では、前の手順で組み立てたJSONになります。

  4. トリガー時にDAGの一時停止を解除を選択し、トリガーをクリックします。 DAGが実行を開始します。

備考

DockerコンテナでAirflowを実行している間(Astro CLIツールを使用する場合など)、コンテナ内で予測ファイルが作成されることが予想されます。 ホストマシンで予測を使用できるようにするには、include/ディレクトリ内の出力場所を指定します。