The combined capabilities of DataRobot MLOps and Apache Airflow provide a reliable solution for retraining and redeploying your models. For example, you can retrain and redeploy your models on a schedule, on model performance degradation, or using a sensor that triggers the pipeline in the presence of new data. This quickstart guide on the DataRobot provider for Apache Airflow illustrates the setup and configuration process by implementing a basic Apache Airflow DAG (Directed Acyclic Graph) to orchestrate an end-to-end DataRobot AI pipeline. This pipeline includes creating a project, training models, deploying a model, scoring predictions, and returning target and feature drift data. In addition, this guide shows you how to import example DAG files from the airflow-provider-datarobot repository so that you can quickly implement a variety of DataRobot pipelines.
Run the following command within the new directory, initializing a new project with the required files:
astrodevinit
Navigate to the requirements.txt file and add the following content:
airflow-provider-datarobot
Run the following command to start a local Airflow instance in a Docker container:
astrodevstart
Once the installation is complete and the web server starts (after approximately one minute), you should be able to access Airflow at http://localhost:8080/.
Sign in to Airflow. The Airflow DAGs page appears.
The datarobot_pipeline Airflow DAG contains operators and sensors that automate the DataRobot pipeline steps. Each operator initiates a specific job, and each sensor waits for a predetermined action to complete:
Operator
Job
CreateProjectOperator
Creates a DataRobot project and returns its ID
TrainModelsOperator
Triggers DataRobot Autopilot to train models
DeployModelOperator
Deploys a specified model and returns the deployment ID
DeployRecommendedModelOperator
Deploys a recommended model and returns the deployment ID
ScorePredictionsOperator
Scores predictions against the deployment and returns a batch prediction job ID
AutopilotCompleteSensor
Senses if Autopilot completed
ScoringCompleteSensor
Senses if batch scoring completed
GetTargetDriftOperator
Returns the target drift from a deployment
GetFeatureDriftOperator
Returns the feature drift from a deployment
Note
This example pipeline doesn't use every available operator or sensor; for more information, see the Operators and Sensors documentation in the project README.
Each operator in the DataRobot pipeline requires specific parameters. You define these parameters in a configuration JSON file and provide the JSON when running the DAG.
{"training_data":"local-path-to-training-data-or-s3-presigned-url-","project_name":"Project created from Airflow","autopilot_settings":{"target":"readmitted","mode":"quick","max_wait":3600},"deployment_label":"Deployment created from Airflow","score_settings":{}}
The parameters from autopilot_settings are passed directly into the Project.set_target() method; you can set any parameter available in this method through the configuration JSON file.
Values in the training_data and score_settings depend on the intake/output type. The parameters from score_settings are passed directly into the BatchPredictionJob.score() method; you can set any parameter available in this method through the configuration JSON file.
For example, see the local file intake/output and Amazon AWS S3 intake/output JSON configuration samples below:
Define training_data
For local file intake, you should provide the local path to the training_data:
1 2 3 4 5 6 7 8 91011
{"training_data":"include/Diabetes10k.csv","project_name":"Project created from Airflow","autopilot_settings":{"target":"readmitted","mode":"quick","max_wait":3600},"deployment_label":"Deployment created from Airflow","score_settings":{}}
Define score_settings
For the scoring intake_settings and output_settings, define the type and provide the local path to the intake and output data locations:
1 2 3 4 5 6 7 8 91011121314151617181920
{"training_data":"include/Diabetes10k.csv","project_name":"Project created from Airflow","autopilot_settings":{"target":"readmitted","mode":"quick","max_wait":3600},"deployment_label":"Deployment created from Airflow","score_settings":{"intake_settings":{"type":"localFile","file":"include/Diabetes_scoring_data.csv"},"output_settings":{"type":"localFile","path":"include/Diabetes_predictions.csv"}}}
Note
When using the Astro CLI tool to run Airflow, you can place local input files in the include/ directory. This location is accessible to the Airflow application inside the Docker container.
Define training_data
For Amazon AWS S3 intake, you can generate a pre-signed URL for the training data file on S3:
In the S3 bucket, click the CSV file.
Click Object Actions at the top-right corner of the screen and click Share with a pre-signed URL.
Set the expiration time interval and click Create presigned URL. The URL is saved to your clipboard.
Paste the URL in the JSON configuration file as the training_data value:
1 2 3 4 5 6 7 8 9101112
{"training_data":"s3-presigned-url","project_name":"Project created from Airflow","autopilot_settings":{"target":"readmitted","mode":"quick","max_wait":3600},"deployment_label":"Deployment created from Airflow","datarobot_aws_credentials":"connection-id","score_settings":{}}
Define datarobot_aws_credentials and score_settings
For scoring data on Amazon AWS S3, you can add your DataRobot AWS credentials to Airflow:
On the List Connection page, click + Add a new record.
In the Connection Type list, click DataRobot AWS Credentials.
Define a Connection Id and enter your Amazon AWS S3 credentials.
Click Test to establish a test connection between Airflow and Amazon AWS S3.
When the connection test is successful, click Save.
You return to the List Connections page, where you should copy the Conn Id.
You can now add the Connection Id / Conn Id value (represented by connection-id in this example) to the datarobot_aws_credentials field when you run the DAG.
For the scoring intake_settings and output_settings, define the type and provide the url for the AWS S3 intake and output data locations:
1 2 3 4 5 6 7 8 9101112131415161718192021
{"training_data":"s3-presigned-url","project_name":"Project created from Airflow","autopilot_settings":{"target":"readmitted","mode":"quick","max_wait":3600},"deployment_label":"Deployment created from Airflow","datarobot_aws_credentials":"connection-id","score_settings":{"intake_settings":{"type":"s3","url":"s3://path/to/scoring-data/Diabetes10k.csv",},"output_settings":{"type":"s3","url":"s3://path/to/results-dir/Diabetes10k_predictions.csv",}}}
Note
Because this pipeline creates a deployment, the output of the deployment creation step provides the deployment_id required for scoring.
After completing the setup steps above, you can run a DataRobot provider DAG in Airflow using the configuration JSON you assembled:
On the Airflow DAGs page, locate the DAG pipeline you want to run.
Click the run icon for that DAG and click Trigger DAG w/ config.
On the DAG conf parameters page, enter the JSON configuration data required by the DAG. In this example, the JSON you assembled in the previous step.
Select Unpause DAG when triggered, and then click Trigger. The DAG starts running:
Note
While running Airflow in a Docker container (e.g., using the Astro CLI tool), expect the predictions file created inside the container. To make the predictions available in the host machine, specify the output location in the include/ directory.