avatarJulian Sara Joseph

Summary

The website content provides a guide on using Apache Airflow for creating data engineering pipelines in Google Cloud Platform (GCP) with Google Cloud Composer 2, including steps for setting up an environment, writing Directed Acyclic Graphs (DAGs), and executing tasks for data workflows.

Abstract

The article introduces Apache Airflow as a powerful tool for orchestrating data engineering pipelines in GCP. It outlines the process of using Google Cloud Composer 2 to manage Airflow environments and emphasizes the importance of DAGs in defining the sequence of tasks. The author recommends the docker-desktop installation method for beginners and provides a step-by-step approach to enable the Composer API, create an environment, and configure necessary permissions. The post also includes examples of writing DAGs with tasks, using operators like python_operator and bash_operator, and illustrates how to transfer data from Google Cloud Storage to BigQuery. The article concludes with troubleshooting tips, such as checking logs for failed runs, and encourages readers to experiment with additional tasks like data cleanup.

Opinions

  • The author endorses using the docker-desktop installation method for those new to Apache Airflow due to its ease of setup.
  • Cloud Composer is highly recommended as it is a fully managed service built on Apache Airflow, simplifying workflow orchestration on GCP.
  • It is suggested to choose the latest version of Cloud Composer unless there are specific reasons to do otherwise.
  • The author emphasizes the flexibility of DAGs, noting that they are essential for defining tasks but can be written in various ways to suit different workflows.
  • The use of BashOperator for executing shell commands within Airflow workflows is presented as a practical approach for tasks like data transfer between GCS and BigQuery.
  • The article promotes the idea of continuous improvement in data pipelines, suggesting readers to add more tasks and refine their DAGs over time.
  • A cost-effective AI service, ZAI.chat, is recommended as an alternative to ChatGPT Plus (GPT-4), highlighting the author's view on value-for-money tools in the data engineering space.

How to use Airflow for Data Engineering pipelines in GCP

So starting this year, I picked up Apache Airflow to understand how to create workflows for automagically creating clusters or models in the cloud. Well that was my reason but doesn’t mean it can’t do more. In this post I will try to share the DAGs I wrote which may help you on your data engineering journey.

A DAG is a Directed Acyclic Graph which collects all the tasks you would want to run together, in the specific order they should be run.

We are going to be using Google Cloud Composer 2 but you can run these on your local installation of airflow too. (If you are just getting started — I’d highly recommend using docker-desktop installation method since it is super easy)

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. Enable this API via the GUI or using the following:

gcloud services enable composer.googleapis.com

(This may take some time)

Next you can create an environment and choose Composer 2.

You will be prompted to Grant required permissions to Cloud Composer service account by a simple checkbox and “Grant” button embedded within the form the next page. After that you can provide a name for your environment and choose a location.

The composer versions, as of writing this blog were the following, I’ll recommend choosing the latest, unless you have reasons to otherwise.

Click the Create button which would be enabled if you have completed all the required field and Granted the permissions to the service account. Now would be the perfect time to go have a cup of tea while your environment of choice gets created 🍵

Once it is ready you will be able to see this:

Let’s now prepare the DAGs that would go in the DAGs bucket. Here’s a sample from the docs.

There are more ways to write a DAG, but whichever way you choose to, remember that DAGs are nothing without Tasks to run.

Let’s start with a simple one already mentioned in the github repo for Composer. (I removed some lines to make it easier to follow)

In this the first variable is the default_dag_args which can include not only the start date(see why it is important here), but also commonly used vars like project_Id, region etc.

the schedule_interval is how often you would like your task to be re-run. This is now auto aligned as the start_date as the moment to start looking. It can be changed to @hourly, @daily etc.

There are two tasks in this example above: hello_python and goodbye_bash which use the python_operator and bash_operator respectively.

Now let me show another BashOperator usecase to transfer data from GCS to BigQuery. This is a simple bq mk command used to create a dataset in a project where this is running and an bq loadcommand to load the table with a csv file kept in a GCS bucket. The schema is autodetected from the CSV. More details can be found here.

Save the above code and upload this file to the DAGs folder.

Navigate to the airflow dashboard we can see:

Clicking the individual DAG leads to the page where we can see the tree with tasks we created and whether or not, each one was successful or not.

We may not always get it right in the first go, due to some or the other errors. To see the logs, all you need to do is click on the failed run and check the logs.

We can always continue to add more tasks and order them as we like for e.g. the ideal next step would be to write a bash command such as bq rm for cleanup! Try it out as an exercise and let me know how it goes!

Hint — It’s Graph should look like this:

Airflow
Google Cloud Platform
Data
Analytics
Recommended from ReadMedium