Integration

Reverse Proxy

Airflow can be set up behind a reverse proxy, with the ability to set its endpoint with great flexibility.

For example, you can configure your reverse proxy to get:

https://lab.mycompany.com/myorg/airflow/

To do so, you need to set the following setting in your airflow.cfg:

base_url = http://my_host/myorg/airflow

Additionally if you use Celery Executor, you can get Flower in /myorg/flower with:

flower_url_prefix = /myorg/flower

Your reverse proxy (ex: nginx) should be configured as follow:

  • pass the url and http header as it for the Airflow webserver, without any rewrite, for example:

    server {
      listen 80;
      server_name lab.mycompany.com;
    
      location /myorg/airflow/ {
          proxy_pass http://localhost:8080;
          proxy_set_header Host $host;
          proxy_redirect off;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "upgrade";
      }
    }
    
  • rewrite the url for the flower endpoint:

    server {
        listen 80;
        server_name lab.mycompany.com;
    
        location /myorg/flower/ {
            rewrite ^/myorg/flower/(.*)$ /$1 break;  # remove prefix from http header
            proxy_pass http://localhost:5555;
            proxy_set_header Host $host;
            proxy_redirect off;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
    }
    

Azure: Microsoft Azure

Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob Storage. Note that the Hook, Sensor and Operator are in the contrib section.

Azure Blob Storage

All classes communicate via the Window Azure Storage Blob protocol. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=KEY), or login and SAS token in the extra field (see connection wasb_default for an example).

WasbBlobSensor

WasbPrefixSensor

FileToWasbOperator

WasbHook

Logging

Airflow can be configured to read and write task logs in Azure Blob Storage. Follow the steps below to enable Azure Blob Storage logging.

  1. Airflow’s logging system requires a custom .py file to be located in the PYTHONPATH, so that it’s importable from Airflow. Start by creating a directory to store the config file. $AIRFLOW_HOME/config is recommended.

  2. Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py.

  3. Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.

  4. Customize the following portions of the template:

    # wasb buckets should start with "wasb" just to help Airflow select correct handler
    REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>'
    
    # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
  5. Make sure a Azure Blob Storage (Wasb) connection hook has been defined in Airflow. The hook should have read and write access to the Azure Blob Storage bucket defined above in REMOTE_BASE_LOG_FOLDER.

  6. Update $AIRFLOW_HOME/airflow.cfg to contain:

    remote_logging = True
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the Azure Blob Storage connection>
    
  7. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.

  8. Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.

AWS: Amazon Web Services

Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section.

AWS EMR

EmrAddStepsOperator

EmrCreateJobFlowOperator

EmrTerminateJobFlowOperator

EmrHook

AWS S3

  • S3FileTransformOperator : Copies data from a source S3 location to a temporary location on the local filesystem.
  • S3ToHiveTransfer : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.
  • S3Hook : Interact with AWS S3.

S3FileTransformOperator

S3ToHiveTransfer

S3Hook

AWS EC2 Container Service

  • ECSOperator : Execute a task on AWS EC2 Container Service.

ECSOperator

AWS Batch Service

AWSBatchOperator

AWS RedShift

AwsRedshiftClusterSensor

RedshiftHook

RedshiftToS3Transfer

Databricks

Databricks has contributed an Airflow operator which enables submitting runs to the Databricks platform. Internally the operator talks to the api/2.0/jobs/runs/submit endpoint.

DatabricksSubmitRunOperator

GCP: Google Cloud Platform

Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.

Logging

Airflow can be configured to read and write task logs in Google cloud storage. Follow the steps below to enable Google cloud storage logging.

  1. Airflow’s logging system requires a custom .py file to be located in the PYTHONPATH, so that it’s importable from Airflow. Start by creating a directory to store the config file. $AIRFLOW_HOME/config is recommended.

  2. Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py.

  3. Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.

  4. Customize the following portions of the template:

    # Add this variable to the top of the file. Note the trailing slash.
    GCS_LOG_FOLDER = 'gs://<bucket where logs should be persisted>/'
    
    # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    'gcs.task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': GCS_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
    # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['gcs.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['gcs.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. Make sure a Google cloud platform connection hook has been defined in Airflow. The hook should have read and write access to the Google cloud storage bucket defined above in GCS_LOG_FOLDER.

  6. Update $AIRFLOW_HOME/airflow.cfg to contain:

    task_log_reader = gcs.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the Google cloud platform hook>
    
  7. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.

  8. Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.

  9. Verify that the Google cloud storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    

Note the top line that says it’s reading from the remote log file.

Please be aware that if you were persisting logs to Google cloud storage using the old-style airflow.cfg configuration method, the old logs will no longer be visible in the Airflow UI, though they’ll still exist in Google cloud storage. This is a backwards incompatbile change. If you are unhappy with it, you can change the FILENAME_TEMPLATE to reflect the old-style log filename format.

BigQuery

BigQuery Operators

BigQueryCheckOperator
BigQueryValueCheckOperator
BigQueryIntervalCheckOperator
BigQueryGetDataOperator
BigQueryCreateEmptyTableOperator
BigQueryCreateExternalTableOperator
BigQueryOperator
BigQueryTableDeleteOperator
BigQueryToBigQueryOperator
BigQueryToCloudStorageOperator

BigQueryHook

Cloud DataFlow

DataFlow Operators

DataFlowJavaOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date':
        (2016, 8, 1),
    'email': ['alex@vanboxel.be'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'us-central1-f',
        'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
    }
}

dag = DAG('test-dag', default_args=default_args)

task = DataFlowJavaOperator(
    gcp_conn_id='gcp_default',
    task_id='normalize-cal',
    jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY'

    },
    dag=dag)
DataflowTemplateOperator
DataFlowPythonOperator

DataFlowHook

Cloud DataProc

DataProc Operators

DataprocClusterCreateOperator
DataprocClusterDeleteOperator
DataProcPigOperator
DataProcHiveOperator
DataProcSparkSqlOperator
DataProcSparkOperator
DataProcHadoopOperator
DataProcPySparkOperator
DataprocWorkflowTemplateInstantiateOperator
DataprocWorkflowTemplateInstantiateInlineOperator

Cloud Datastore

Datastore Operators

DatastoreExportOperator
DatastoreImportOperator

DatastoreHook

Cloud ML Engine

Cloud ML Engine Operators

MLEngineBatchPredictionOperator
MLEngineModelOperator
MLEngineTrainingOperator
MLEngineVersionOperator

Cloud ML Engine Hook

MLEngineHook

Cloud Storage

Storage Operators

FileToGoogleCloudStorageOperator
GoogleCloudStorageCopyOperator
GoogleCloudStorageCreateBucketOperator
GoogleCloudStorageDownloadOperator
GoogleCloudStorageListOperator
GoogleCloudStorageToBigQueryOperator
GoogleCloudStorageToGoogleCloudStorageOperator

GoogleCloudStorageHook