How to Build Powerful Airflow DAGs for Big Data Workflows in Python
Scale your Airflow pipelines to the cloud
Airflow DAGs for (Really!) Big Data
Apache Airflow is one of the most popular tools for orchestrating data engineering, machine learning, and DevOps workflows. But it has one important drawback. Out-of-the-box, Airflow will run your computations locally, which means you can only process datasets that fit within the resources of your machine.
To use Airflow for computations on larger-than-memory datasets, you can scale out the specific Airflow tasks containing heavy workloads to a Dask cluster. This blog will show you how to construct Airflow DAGs for larger-than-memory datasets with only minimal changes to your existing Python code.
We’ll walk through one example in detail. You can find the other Airflow DAG examples in this dedicated repository.
How to Scale Airflow ETL Tasks using Dask
Airflow workflows are defined using Tasks
and DAGs
and orchestrated by Executors
. To delegate heavy workflows to Dask, we'll spin up a Coiled cluster within a Task
that contains heavy computation and bring back the result, in this case a .value_counts()
over a column of interest. Since this result will fit into memory easily, we can shut down the cluster immediately to limit cost and continue using the result in our next tasks locally.
Disclaimer: I work at Coiled as a Data Science Evangelist. Coiled is founded by Matthew Rocklin, the initial author of Dask, an open-source Python library for distributed computing.
Define your Airflow ETL DAG
The DAG
will contain the following 3 Tasks
:
- Spin up a Coiled cluster, perform heavy computations over the entire dataset, then shut the cluster down;
- Use result to calculate summary statistics and save these to a CSV file;
- Use result to find the top 100 most active Github users and save these to a CSV file.
Let’s start by defining an Airflow DAG
using the @dag
decorator, passing it the default_args
defined earlier in the script as well as a number of other arguments you can tweak.
# define DAG as a function with the @dag decorator
@dag(
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['coiled-demo'],
)
def airflow_on_coiled():
Launch your Dask Cluster
Let’s define our first Task
.
This spins up a Coiled cluster named “airflow-task” consisting of 20 Dask workers, each running a specified Coiled software environment to ensure that they have all the right dependencies.
# define Airflow task that runs a computation on a Coiled cluster
@task()
def transform():
# Create and connect to Coiled cluster
cluster = coiled.Cluster(
name="airflow-task",
n_workers=20,
software="rrpelgrim/airflow",
)
client = Client(cluster)
We can then read our dataset stored on S3 into a Dask DataFrame and calculate the result we’re interested in. Here, we load in the Github Archive data for 2015 (subsetted to include only PushEvents) and calculate the number of PushEvents per user for the entire year using a call to .value_counts()
.
# Read CSV data from S3
ddf = dd.read_parquet(
's3://coiled-datasets/github-archive/github-archive-2015.parq/',
storage_options={"anon": True, 'use_ssl': True},
blocksize="16 MiB",
)
# Compute result number of entries (PushEvents) per user
result = ddf.user.value_counts().compute()
Since we now have our result locally, we can shut the cluster down to limit our costs. Note that this is really just a formality because Coiled will shut down your cluster automatically after 20 minutes of inactivity.
# Shutdown Coiled cluster
cluster.close()
return result
Going over to the Coiled Cloud dashboard we can see that this computation cost us 5 cents. And no, that’s not a typo 😉 That means you could run this Airflow DAG example up to 200 times a month for free using the Coiled Free Tier.
Use Result Locally
We’ve leveraged cloud resources to get the result we’re interested in and now we can proceed with our following Tasks locally. Because Coiled runs locally on your own machine, reading and writing to local disk is straightforward.
We’ll generate summary statistics over the result
pandas Series and save those to a CSV file:
@task()
def summarize(series):
# Get summary statistics
sum_stats = series.describe()
# Save to CSV
sum_stats.to_csv(
f'{storage_directory}usercounts_summary_statistics.csv'
)
return sum_stats
And get the usernames and number of PushEvents for the top 100 most active users:
@task()
def get_top_users(series):
# Get top 100 most active users
top_100 = series.head(100)
# Store user + number of events to CSV
top_100.to_csv(f'{storage_directory}top_100_users.csv')
return top_100
Last but not least, we’ll specify the order in which we want the Airflow Tasks
to run and actually call the DAG
function to trigger the workflow.
# Call task functions in order
series = transform()
sum_stats = summarize(series)
top_100 = get_top_users(series)
# Call taskflow
airflow_on_coiled()
Great job, you’re all set! You can now add your Airflow DAG Python script to your dags
folder (by default: ~/airflow/dags
) and run or schedule it as needed.
Important: Airflow disables pickling
by default. You will have to enable it in order to run Dask tasks. You can do this by editing your airflow.cfg
file or by setting the corresponding environment variable using export AIRFLOW__CORE__ENABLE_XCOM_PICKLING = True
. Do this before launching your Airflow webserver. If you're working on an Apple M1 machine you may want to check out this blog on installing PyData libraries using conda. Specifically, make sure that neither blosc
nor python-blosc
libraries are installed on your local and cluster software environments.
More Airflow DAG Examples
In thededicated airflow-with-coiled repository, you will find two more Airflow DAG examples using Dask. The examples include common Airflow ETL operations.
Note that:
- The JSON-to-Parquet conversion DAG example requires you to connect Airflow to Amazon S3. You can find the instructions for doing so in the Airflow docs here. You will also need to pass your AWS secrets to the
to_parquet()
call using thestorage_options
keyword argument. - The XGBoost DAG example works with only a ~250MB subset of the >20GB ARCOS dataset. To run it on the entire dataset, check out this tutorial.
Run all Airflow ETL Tasks with the DaskExecutor
The Airflow DAG examples in the repo above launch Coiled clusters from within an Airflow task. You can also opt for a different architecture and run all of the tasks in an Airflow DAG on a Coiled cluster. You can then use Coiled’s adaptive scaling capabilities to scale the number of workers up and down depending on the workload.
To do this, switch from using Airflow’s default SequentialExecutor
to the DaskExecutor
. Using any Airflow executor other than the default SequentialExecutor also requires setting up a dedicated database backend where Airflow can store the metadata related to your workflow.Once that's done, point the DaskExecutor
to a Coiled cluster that is already running.
You can do this by making the following changes in your airflow.cfg
file, by default stored in ~/airflow/.
- Set
executor = DaskExecutor
- Set
cluster_address = <cluster_IP_address/cluster_port>
. You can access this address usingcluster.scheduler_address
- Set the cluster’s TLS settings:
tls_cert
,tls_key
, andtls_ca
. You can access these usingclient.security.tls_key
andclient.security.tls_cert
. Note that the value fortls_ca
is the same astls_cert
.
You can then run the entire Airflow DAG on Coiled.
Including a cluster.adapt(minimum=1, maximum=20)
in the script that spins up your Coiled cluster will ensure that the cluster adaptively scales between a set minimum and maximum number of workers (in this case between 1 and 20) depending on the workload.
Get in touch
Follow me here and on Twitter for more content like this.
Originally published at https://coiled.io on January 7, 2022.