avatarRichard Pelgrim

Summary

The web content provides a comprehensive guide on scaling Apache Airflow Directed Acyclic Graphs (DAGs) for big data workflows using Dask, with practical examples and cost-effective cloud resource management through Coiled.

Abstract

The article titled "How to Build Powerful Airflow DAGs for Big Data Workflows in Python" discusses the challenges of processing large datasets with Apache Airflow, which is traditionally limited to local computations. It introduces a solution to scale out heavy Airflow tasks to a Dask cluster, allowing for the handling of larger-than-memory datasets. The author, a Data Science Evangelist at Coiled, demonstrates how to construct Airflow DAGs that leverage cloud resources for computation and then seamlessly integrate the results back into local workflows. The guide includes detailed code examples for spinning up a Coiled Dask cluster, performing heavy computations, and shutting down the cluster to manage costs. It also covers the use of the resulting data in subsequent tasks, such as generating summary statistics and identifying the most active Github users. The article emphasizes the affordability and efficiency of this approach, with a computation example costing only 5 cents, and provides additional resources for common Dask mistakes and further Airflow DAG examples.

Opinions

  • The author advocates for the use of Coiled's cloud resources to enhance Airflow's capabilities for big data workflows, highlighting the ease of scaling and cost-effectiveness.
  • There is an emphasis on the practicality of using Dask clusters within Airflow DAGs to handle large datasets that cannot be processed locally due to memory constraints.
  • The author suggests that using Coiled for cloud-based Dask clusters is a user-friendly solution, as it automates cluster management and scaling, and provides a free tier for new users.
  • The article promotes the idea that integrating Dask with Airflow can be done with minimal changes to existing Python code, making it an accessible option for data engineers and scientists.
  • By providing real-world examples and addressing common issues, such as the need to enable pickling in Airflow, the author aims to lower the barrier to entry for using Dask within Airflow DAGs.

How to Build Powerful Airflow DAGs for Big Data Workflows in Python

Scale your Airflow pipelines to the cloud

image by Solen Feyissa via Unsplash

Airflow DAGs for (Really!) Big Data

Apache Airflow is one of the most popular tools for orchestrating data engineering, machine learning, and DevOps workflows. But it has one important drawback. Out-of-the-box, Airflow will run your computations locally, which means you can only process datasets that fit within the resources of your machine.

To use Airflow for computations on larger-than-memory datasets, you can scale out the specific Airflow tasks containing heavy workloads to a Dask cluster. This blog will show you how to construct Airflow DAGs for larger-than-memory datasets with only minimal changes to your existing Python code.

We’ll walk through one example in detail. You can find the other Airflow DAG examples in this dedicated repository.

How to Scale Airflow ETL Tasks using Dask

Airflow workflows are defined using Tasks and DAGs and orchestrated by Executors. To delegate heavy workflows to Dask, we'll spin up a Coiled cluster within a Task that contains heavy computation and bring back the result, in this case a .value_counts() over a column of interest. Since this result will fit into memory easily, we can shut down the cluster immediately to limit cost and continue using the result in our next tasks locally.

Disclaimer: I work at Coiled as a Data Science Evangelist. Coiled is founded by Matthew Rocklin, the initial author of Dask, an open-source Python library for distributed computing.

Define your Airflow ETL DAG

The DAG will contain the following 3 Tasks:

  1. Spin up a Coiled cluster, perform heavy computations over the entire dataset, then shut the cluster down;
  2. Use result to calculate summary statistics and save these to a CSV file;
  3. Use result to find the top 100 most active Github users and save these to a CSV file.

Let’s start by defining an Airflow DAG using the @dag decorator, passing it the default_args defined earlier in the script as well as a number of other arguments you can tweak.

# define DAG as a function with the @dag decorator 
@dag( 
    default_args=default_args, 
    schedule_interval=None, 
    start_date=datetime(2021, 1, 1), 
    catchup=False, 
    tags=['coiled-demo'], 
) 
def airflow_on_coiled():

Launch your Dask Cluster

Let’s define our first Task.

This spins up a Coiled cluster named “airflow-task” consisting of 20 Dask workers, each running a specified Coiled software environment to ensure that they have all the right dependencies.

# define Airflow task that runs a computation on a Coiled cluster 
@task() 
def transform(): 
    # Create and connect to Coiled cluster
    cluster = coiled.Cluster( 
        name="airflow-task", 
        n_workers=20, 
        software="rrpelgrim/airflow", 
    ) 
    client = Client(cluster)

We can then read our dataset stored on S3 into a Dask DataFrame and calculate the result we’re interested in. Here, we load in the Github Archive data for 2015 (subsetted to include only PushEvents) and calculate the number of PushEvents per user for the entire year using a call to .value_counts().

# Read CSV data from S3 
ddf = dd.read_parquet( 
    's3://coiled-datasets/github-archive/github-archive-2015.parq/',    
    storage_options={"anon": True, 'use_ssl': True}, 
    blocksize="16 MiB", 
) 
# Compute result number of entries (PushEvents) per user 
result = ddf.user.value_counts().compute()

Since we now have our result locally, we can shut the cluster down to limit our costs. Note that this is really just a formality because Coiled will shut down your cluster automatically after 20 minutes of inactivity.

# Shutdown Coiled cluster 
cluster.close() 
return result

Going over to the Coiled Cloud dashboard we can see that this computation cost us 5 cents. And no, that’s not a typo 😉 That means you could run this Airflow DAG example up to 200 times a month for free using the Coiled Free Tier.

Use Result Locally

We’ve leveraged cloud resources to get the result we’re interested in and now we can proceed with our following Tasks locally. Because Coiled runs locally on your own machine, reading and writing to local disk is straightforward.

We’ll generate summary statistics over the result pandas Series and save those to a CSV file:

@task() 
def summarize(series): 
    # Get summary statistics 
    sum_stats = series.describe() 
    # Save to CSV   
    sum_stats.to_csv(
         f'{storage_directory}usercounts_summary_statistics.csv'
    ) 
    
    return sum_stats

And get the usernames and number of PushEvents for the top 100 most active users:

@task() 
def get_top_users(series): 
    # Get top 100 most active users 
    top_100 = series.head(100) 
    # Store user + number of events to CSV 
    top_100.to_csv(f'{storage_directory}top_100_users.csv') 
    return top_100

Last but not least, we’ll specify the order in which we want the Airflow Tasks to run and actually call the DAG function to trigger the workflow.

# Call task functions in order 
series = transform() 
sum_stats = summarize(series) 
top_100 = get_top_users(series) 
# Call taskflow 
airflow_on_coiled()

Great job, you’re all set! You can now add your Airflow DAG Python script to your dags folder (by default: ~/airflow/dags) and run or schedule it as needed.

Important: Airflow disables pickling by default. You will have to enable it in order to run Dask tasks. You can do this by editing your airflow.cfg file or by setting the corresponding environment variable using export AIRFLOW__CORE__ENABLE_XCOM_PICKLING = True. Do this before launching your Airflow webserver. If you're working on an Apple M1 machine you may want to check out this blog on installing PyData libraries using conda. Specifically, make sure that neither blosc nor python-blosc libraries are installed on your local and cluster software environments.

More Airflow DAG Examples

In thededicated airflow-with-coiled repository, you will find two more Airflow DAG examples using Dask. The examples include common Airflow ETL operations.

Note that:

  • The JSON-to-Parquet conversion DAG example requires you to connect Airflow to Amazon S3. You can find the instructions for doing so in the Airflow docs here. You will also need to pass your AWS secrets to the to_parquet() call using the storage_options keyword argument.
  • The XGBoost DAG example works with only a ~250MB subset of the >20GB ARCOS dataset. To run it on the entire dataset, check out this tutorial.

Run all Airflow ETL Tasks with the DaskExecutor

The Airflow DAG examples in the repo above launch Coiled clusters from within an Airflow task. You can also opt for a different architecture and run all of the tasks in an Airflow DAG on a Coiled cluster. You can then use Coiled’s adaptive scaling capabilities to scale the number of workers up and down depending on the workload.

To do this, switch from using Airflow’s default SequentialExecutor to the DaskExecutor. Using any Airflow executor other than the default SequentialExecutor also requires setting up a dedicated database backend where Airflow can store the metadata related to your workflow.Once that's done, point the DaskExecutor to a Coiled cluster that is already running.

You can do this by making the following changes in your airflow.cfg file, by default stored in ~/airflow/.

  1. Set executor = DaskExecutor
  2. Set cluster_address = <cluster_IP_address/cluster_port>. You can access this address using cluster.scheduler_address
  3. Set the cluster’s TLS settings: tls_cert, tls_key, and tls_ca. You can access these using client.security.tls_key and client.security.tls_cert. Note that the value for tls_ca is the same as tls_cert.

You can then run the entire Airflow DAG on Coiled.

Including a cluster.adapt(minimum=1, maximum=20) in the script that spins up your Coiled cluster will ensure that the cluster adaptively scales between a set minimum and maximum number of workers (in this case between 1 and 20) depending on the workload.

Get in touch

Follow me here and on Twitter for more content like this.

Originally published at https://coiled.io on January 7, 2022.

Data Science
Python
Airflow
Data Engineering
Recommended from ReadMedium