Integration¶
- ReverseProxy
- Azure: Microsoft Azure
- AWS: Amazon Web Services
- Databricks
- GCP: Google Cloud Platform
Reverse Proxy¶
Airflow can be set up behind a reverse proxy, with the ability to set its endpoint with great flexibility.
For example, you can configure your reverse proxy to get:
https://lab.mycompany.com/myorg/airflow/
To do so, you need to set the following setting in your airflow.cfg:
base_url = http://my_host/myorg/airflow
Additionally if you use Celery Executor, you can get Flower in /myorg/flower with:
flower_url_prefix = /myorg/flower
Your reverse proxy (ex: nginx) should be configured as follow:
pass the url and http header as it for the Airflow webserver, without any rewrite, for example:
server { listen 80; server_name lab.mycompany.com; location /myorg/airflow/ { proxy_pass http://localhost:8080; proxy_set_header Host $host; proxy_redirect off; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } }
rewrite the url for the flower endpoint:
server { listen 80; server_name lab.mycompany.com; location /myorg/flower/ { rewrite ^/myorg/flower/(.*)$ /$1 break; # remove prefix from http header proxy_pass http://localhost:5555; proxy_set_header Host $host; proxy_redirect off; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } }
Azure: Microsoft Azure¶
Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob Storage. Note that the Hook, Sensor and Operator are in the contrib section.
Azure Blob Storage¶
All classes communicate via the Window Azure Storage Blob protocol. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=KEY), or login and SAS token in the extra field (see connection wasb_default for an example).
- WasbBlobSensor: Checks if a blob is present on Azure Blob storage.
- WasbPrefixSensor: Checks if blobs matching a prefix are present on Azure Blob storage.
- FileToWasbOperator: Uploads a local file to a container as a blob.
- WasbHook: Interface with Azure Blob Storage.
WasbBlobSensor¶
WasbPrefixSensor¶
FileToWasbOperator¶
WasbHook¶
Logging¶
Airflow can be configured to read and write task logs in Azure Blob Storage. Follow the steps below to enable Azure Blob Storage logging.
Airflow’s logging system requires a custom .py file to be located in the
PYTHONPATH
, so that it’s importable from Airflow. Start by creating a directory to store the config file.$AIRFLOW_HOME/config
is recommended.Create empty files called
$AIRFLOW_HOME/config/log_config.py
and$AIRFLOW_HOME/config/__init__.py
.Copy the contents of
airflow/config_templates/airflow_local_settings.py
into thelog_config.py
file that was just created in the step above.Customize the following portions of the template:
# wasb buckets should start with "wasb" just to help Airflow select correct handler REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>' # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ...
Make sure a Azure Blob Storage (Wasb) connection hook has been defined in Airflow. The hook should have read and write access to the Azure Blob Storage bucket defined above in
REMOTE_BASE_LOG_FOLDER
.Update
$AIRFLOW_HOME/airflow.cfg
to contain:remote_logging = True logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the Azure Blob Storage connection>
Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.
AWS: Amazon Web Services¶
Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section.
AWS EMR¶
- EmrAddStepsOperator : Adds steps to an existing EMR JobFlow.
- EmrCreateJobFlowOperator : Creates an EMR JobFlow, reading the config from the EMR connection.
- EmrTerminateJobFlowOperator : Terminates an EMR JobFlow.
- EmrHook : Interact with AWS EMR.
EmrAddStepsOperator¶
EmrCreateJobFlowOperator¶
EmrTerminateJobFlowOperator¶
EmrHook¶
AWS S3¶
- S3FileTransformOperator : Copies data from a source S3 location to a temporary location on the local filesystem.
- S3ToHiveTransfer : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.
- S3Hook : Interact with AWS S3.
S3FileTransformOperator¶
S3ToHiveTransfer¶
S3Hook¶
AWS RedShift¶
- AwsRedshiftClusterSensor : Waits for a Redshift cluster to reach a specific status.
- RedshiftHook : Interact with AWS Redshift, using the boto3 library.
- RedshiftToS3Transfer : Executes an unload command to S3 as a CSV with headers.
AwsRedshiftClusterSensor¶
RedshiftHook¶
RedshiftToS3Transfer¶
Databricks¶
Databricks has contributed an Airflow operator which enables
submitting runs to the Databricks platform. Internally the operator talks to the
api/2.0/jobs/runs/submit
endpoint.
DatabricksSubmitRunOperator¶
GCP: Google Cloud Platform¶
Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.
Logging¶
Airflow can be configured to read and write task logs in Google cloud storage. Follow the steps below to enable Google cloud storage logging.
Airflow’s logging system requires a custom .py file to be located in the
PYTHONPATH
, so that it’s importable from Airflow. Start by creating a directory to store the config file.$AIRFLOW_HOME/config
is recommended.Create empty files called
$AIRFLOW_HOME/config/log_config.py
and$AIRFLOW_HOME/config/__init__.py
.Copy the contents of
airflow/config_templates/airflow_local_settings.py
into thelog_config.py
file that was just created in the step above.Customize the following portions of the template:
# Add this variable to the top of the file. Note the trailing slash. GCS_LOG_FOLDER = 'gs://<bucket where logs should be persisted>/' # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 'gcs.task': { 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'gcs_log_folder': GCS_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
Make sure a Google cloud platform connection hook has been defined in Airflow. The hook should have read and write access to the Google cloud storage bucket defined above in
GCS_LOG_FOLDER
.Update
$AIRFLOW_HOME/airflow.cfg
to contain:task_log_reader = gcs.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the Google cloud platform hook>
Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.
Verify that the Google cloud storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
Note the top line that says it’s reading from the remote log file.
Please be aware that if you were persisting logs to Google cloud storage using the old-style airflow.cfg configuration method, the old logs will no longer be visible in the Airflow UI, though they’ll still exist in Google cloud storage. This is a backwards incompatbile change. If you are unhappy with it, you can change the FILENAME_TEMPLATE
to reflect the old-style log filename format.
BigQuery¶
BigQuery Operators¶
- BigQueryCheckOperator : Performs checks against a SQL query that will return a single row with different values.
- BigQueryValueCheckOperator : Performs a simple value check using SQL code.
- BigQueryIntervalCheckOperator : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
- BigQueryCreateEmptyTableOperator : Creates a new, empty table in the specified BigQuery dataset optionally with schema.
- BigQueryCreateExternalTableOperator : Creates a new, external table in the dataset with the data in Google Cloud Storage.
- BigQueryOperator : Executes BigQuery SQL queries in a specific BigQuery database.
- BigQueryToBigQueryOperator : Copy a BigQuery table to another BigQuery table.
- BigQueryToCloudStorageOperator : Transfers a BigQuery table to a Google Cloud Storage bucket
BigQueryCheckOperator¶
BigQueryValueCheckOperator¶
BigQueryIntervalCheckOperator¶
BigQueryGetDataOperator¶
BigQueryCreateEmptyTableOperator¶
BigQueryCreateExternalTableOperator¶
BigQueryOperator¶
BigQueryTableDeleteOperator¶
BigQueryToBigQueryOperator¶
BigQueryToCloudStorageOperator¶
BigQueryHook¶
Cloud DataFlow¶
DataFlow Operators¶
- DataFlowJavaOperator : launching Cloud Dataflow jobs written in Java.
- DataflowTemplateOperator : launching a templated Cloud DataFlow batch job.
- DataFlowPythonOperator : launching Cloud Dataflow jobs written in python.
DataFlowJavaOperator¶
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date':
(2016, 8, 1),
'email': ['alex@vanboxel.be'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
'dataflow_default_options': {
'project': 'my-gcp-project',
'zone': 'us-central1-f',
'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
}
}
dag = DAG('test-dag', default_args=default_args)
task = DataFlowJavaOperator(
gcp_conn_id='gcp_default',
task_id='normalize-cal',
jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
options={
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '50',
'start': '{{ds}}',
'partitionType': 'DAY'
},
dag=dag)
DataflowTemplateOperator¶
DataFlowPythonOperator¶
DataFlowHook¶
Cloud DataProc¶
DataProc Operators¶
- DataprocClusterCreateOperator : Create a new cluster on Google Cloud Dataproc.
- DataprocClusterDeleteOperator : Delete a cluster on Google Cloud Dataproc.
- DataProcPigOperator : Start a Pig query Job on a Cloud DataProc cluster.
- DataProcHiveOperator : Start a Hive query Job on a Cloud DataProc cluster.
- DataProcSparkSqlOperator : Start a Spark SQL query Job on a Cloud DataProc cluster.
- DataProcSparkOperator : Start a Spark Job on a Cloud DataProc cluster.
- DataProcHadoopOperator : Start a Hadoop Job on a Cloud DataProc cluster.
- DataProcPySparkOperator : Start a PySpark Job on a Cloud DataProc cluster.
- DataprocWorkflowTemplateInstantiateOperator : Instantiate a WorkflowTemplate on Google Cloud Dataproc.
- DataprocWorkflowTemplateInstantiateInlineOperator : Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc.
DataprocClusterCreateOperator¶
DataprocClusterDeleteOperator¶
DataProcPigOperator¶
DataProcHiveOperator¶
DataProcSparkSqlOperator¶
DataProcSparkOperator¶
DataProcHadoopOperator¶
DataProcPySparkOperator¶
DataprocWorkflowTemplateInstantiateOperator¶
DataprocWorkflowTemplateInstantiateInlineOperator¶
Cloud Datastore¶
Datastore Operators¶
- DatastoreExportOperator : Export entities from Google Cloud Datastore to Cloud Storage.
- DatastoreImportOperator : Import entities from Cloud Storage to Google Cloud Datastore.
DatastoreExportOperator¶
DatastoreImportOperator¶
DatastoreHook¶
Cloud ML Engine¶
Cloud ML Engine Operators¶
- MLEngineBatchPredictionOperator : Start a Cloud ML Engine batch prediction job.
- MLEngineModelOperator : Manages a Cloud ML Engine model.
- MLEngineTrainingOperator : Start a Cloud ML Engine training job.
- MLEngineVersionOperator : Manages a Cloud ML Engine model version.
MLEngineBatchPredictionOperator¶
MLEngineModelOperator¶
MLEngineTrainingOperator¶
MLEngineVersionOperator¶
Cloud Storage¶
Storage Operators¶
- FileToGoogleCloudStorageOperator : Uploads a file to Google Cloud Storage.
- GoogleCloudStorageCopyOperator : Copies objects (optionally from a directory) filtered by ‘delimiter’ (file extension for e.g .json) from a bucket to another bucket in a different directory, if required.
- GoogleCloudStorageCreateBucketOperator : Creates a new cloud storage bucket.
- GoogleCloudStorageListOperator : List all objects from the bucket with the give string prefix and delimiter in name.
- GoogleCloudStorageDownloadOperator : Downloads a file from Google Cloud Storage.
- GoogleCloudStorageToBigQueryOperator : Loads files from Google cloud storage into BigQuery.
- GoogleCloudStorageToGoogleCloudStorageOperator : Copies a single object from a bucket to another, with renaming if requested.