avatarXavier Escudero

Summary

This article outlines the process of configuring Apache Airflow to orchestrate Spark data pipelines, focusing on the creation of a DAG for managing the execution of two tasks: schema creation for daily historical data tables and the Spark job for loading daily stock data.

Abstract

The article delves into the technical setup required to manage data pipelines using Apache Airflow, specifically for financial trading data. It provides a step-by-step guide on setting up Airflow with Docker-compose, configuring connections to both Spark and PostgreSQL, and developing a DAG named load_historical_data_dag. This DAG is scheduled to run on weekdays and includes two main tasks: the sql_create task for creating table schemas in PostgreSQL if they do not exist, and the submit_job task for executing a Spark program to load daily stock data. The article also emphasizes the importance of defining variables in Airflow for storing values used in DAGs and explains how to manually trigger the DAG from the Airflow dashboard. The ultimate goal is to automate the process of loading and managing daily trading data efficiently.

Opinions

  • The author recommends using the LocalExecutor for running tasks directly on the machine where the scheduler is deployed, which is suitable for local development and testing.
  • For those unfamiliar with Apache Airflow, the author suggests reading an introductory lecture to gain a better understanding before proceeding with the setup.
  • The author advocates for the use of the SparkSubmitOperator in Airflow to run Spark jobs, highlighting its effectiveness in integrating Spark applications within Airflow workflows.
  • The article encourages the use of crontab.guru for learning and testing cron schedule expressions, which are used to define the execution schedule of DAGs in Airflow.
  • The author provides a subjective opinion on the importance of community support, inviting readers to clap and follow for more content, and offers additional resources for developing trading bots and joining a premium Discord server.
  • A disclaimer is provided stating that the article's content is for educational or entertainment purposes and should not be taken as personal investment advice.

Building the Ultimate Trading Data Pipeline — Part 5: Orchestrate data pipelines

In this article we will configure Apache Airflow to manage the orchestration of Spark data pipelines, building upon on one of the pipelines discussed in earlier articles.

The focus will be on creating a DAG designed to execute two tasks on a scheduled basis:

  1. Table Schema Creation. If the daily historical data tables do not exist, this task will handle the creation of their schema.
  2. Spark Execution Task. Another task in the DAG will be responsible for submitting the execution of loading daily bars into Spark, leveraging the work covered in the preceding article.

Before continuing if you need a good overview of what is Airflow and how does it work, I recommend the lecture Apache Airflow in 10 minutes

Setup Airflow with Docker-compose

We can set Airflow in several ways. For our purposes we’re going to use a LocalExecutor, that runs tasks directly on the machine where the scheduler is deployed, with docker-compose.

You can see the different kind of airflow executors at: Executors in Apache-Airflow

Establish a network to enable mutual visibility between Airflow and Spark:

docker network create airflow-spark-network

Intentionally, in the previous article on creating Spark containers, we already configured the use of this network.

Generate a file titled Dockerfile-airflow containing the following content:

FROM apache/airflow:2.8.0-python3.11  
USER root  
RUN apt-get update \  
  && apt-get install -y --no-install-recommends \  
         openjdk-17-jre-headless \  
  && apt-get autoremove -yqq --purge \  
  && apt-get clean \  
  && rm -rf /var/lib/apt/lists/*  
USER airflow  
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64  
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-providers-apache-spark==2.1.3  

COPY ./requirements.txt /  
RUN pip install -r /requirements.txt  
COPY ./lib/postgresql-42.2.6.jar /opt/airflow/lib/postgresql-42.2.6.jar

Mainly we’re installing Javaand the library apache-airflow-providers-apache-sparkin the image, so we can connect Airflow with Spark.

Build the image:

docker build -f Dockerfile-airflow . -t trading-airflow

And download the file docker-compose-airflow.yaml from: https://gist.github.com/xescuder/c05305c92035202693818db272868bda . Start the container executing next command:

docker-compose -f docker-compose-airflow.yaml up -d

The docker-compose-airflow.yamlfile contains a volume mapping with the project:

  • ${AIRFLOW_DAGS}:/opt/airflow/dags. The DAG should be located in the directory dags.
  • ./trading_data_pipeline/:/opt/airflow/trading_data_pipeline. The Spark ETLs and custom imported modules are located in the folder trading_data_pipeline.

Before launching the container, you have the option to set and modify both the username and password. By default, they are both set to airflow.

Navigate to http://server:8080 in your web browser and log in using the credentials to gain access to the Airflow dashboard.

Setup the connection with Spark

Add a new connection to Spark from menu Admin > Connections:

  • Connection Id: spark-connection
  • Connection Type: Spark
  • Host: spark://spark
  • Port: Use the port 7077.
Airflow connection with Spark

Setup a connection with Postgres

For the first task, we establish an Airflow connection to PostgreSQL. Navigate to the Airflow dashboard, access Admin > Connections, and select the plus sign at the upper left. Provide the connection parameters for the PostgreSQL database and then click on Save.

Postgres Connection Definition

Create variables

We have the option to define variables to store values for use in our DAGs.

Navigate to Admin > Variables and establish a new variable named PYSPARK_APP_HOME with the value /opt/airflow/trading_data_pipeline (connected to the Docker volume configuration for utilizing our source folder trading_data_pipeline). This variable specifies the directory containing our Spark code.

Develop the DAG

We create a DAG named load_historical_data_dag scheduled to run every Monday to Friday at 22:00 If you don't know how to use the cron format I recommend to test and play with https://crontab.guru/ and his examples section.

pyspark_app_home = Variable.get("PYSPARK_APP_HOME")

with DAG('load_historical_data_dag',  
         start_date=datetime(2023, 1, 1),  
         schedule_interval='0 22 * * 1-5',  
         description='Load daily data',  
         ) as dag:  
    sql_create = PostgresOperator(task_id='sql_create_', postgres_conn_id='trading-data-postgres-connection',  
                                  sql='./sql/02-daily_bars_schema.sql')    
    submit_job = SparkSubmitOperator(  
                task_id='spark_load_daily_bars_task',  
                application=f'{pyspark_app_home}/load_daily_bars_etl.py',  
                conn_id='spark-connection',  
                jars='./lib/postgresql-42.2.6.jar',                   
packages='org.postgresql:postgresql:42.6.0',  
                total_executor_cores=1,  
                driver_memory='1g',  
                verbose=True  
    )  
    sql_create >> submit_job  

This DAG comprises two tasks executed sequentially, connected using the >> syntax.

  • sql_create: Creation of needed tables (if they don't exist)
  • submit_job: Call Spark Program to load stocks daily data

Tables creation task (sql_create)

The PostgresOperator task allows to execute a SQL script, that is located in the folder dags/sql/02-daily_bars_schema.sql.

CREATE TABLE IF NOT EXISTS daily_bars (  
    symbol text,  
    date date NOT NULL,    
    open double precision,    
    high double precision,  
    low double precision,  
    close double precision,    
    volume bigint,  
    u_close double precision);


SELECT create_hypertable('daily_bars', 'date', if_not_exists => TRUE);  
CREATE INDEX IF NOT EXISTS ix_symbol_date on daily_bars(symbol, date desc);

Spark load daily bars pipeline execution (submit_job)

I suggest reading this article to gain insights into various methods for invoking the Spark code we’ve developed earlier. In our case, we’ll employ the approach using the SparkSubmitOperator.

The Airflow SparkSubmitOperator is an operator that allows to run Spark jobs from within an Airflow DAG.

submit_job = SparkSubmitOperator(  
                task_id='spark_load_daily_bars_task',                  
                application=f'{pyspark_app_home}/load_daily_bars_etl.py',  
                conn_id='spark-connection',  
                jars='./lib/postgresql-42.2.6.jar',                 
packages='org.postgresql:postgresql:42.6.0',  
                total_executor_cores=1,  
                driver_memory='1g',  
                verbose=True  
    )
  • The application parameter specifies the path to the Spark application file that needs to be executed. We're forming the value of location using the pyspark_app_home variable and the specific application file name, which is load_daily_bars_etl.py(our Pyspark program)
  • The conn_id parameter identifies the Spark connection established during setup.
  • The jars parameter includes external dependencies necessary for our Spark application. Here, it specifically references the PostgreSQL JAR file.
  • Within the packages parameter, we also declare the PostgreSQL driver.

Execution of DAG

The DAG is already scheduled to be executed, but we can execute it manually at any time from Airflow dashboard from DAGs menu:

Click on name, and then at top right press the play iconbutton

We’ll se the dag running. We can go to each run id and see the status:

👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏

Thank you for being part of our community! Before you go:

  • If you liked the story feel free to clap 👏 and follow the author.
  • Learn How To Develop Your Trading Bots 👉 here.
  • Join our Premium Discord Server👉 here.

*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.

Airflow
Spark
Spark Submit
Trading
Recommended from ReadMedium