Building the Ultimate Trading Data Pipeline — Part 5: Orchestrate data pipelines
In this article we will configure Apache Airflow to manage the orchestration of Spark data pipelines, building upon on one of the pipelines discussed in earlier articles.
The focus will be on creating a DAG designed to execute two tasks on a scheduled basis:
- Table Schema Creation. If the daily historical data tables do not exist, this task will handle the creation of their schema.
- Spark Execution Task. Another task in the DAG will be responsible for submitting the execution of loading daily bars into Spark, leveraging the work covered in the preceding article.
Before continuing if you need a good overview of what is Airflow and how does it work, I recommend the lecture Apache Airflow in 10 minutes
Setup Airflow with Docker-compose
We can set Airflow in several ways. For our purposes we’re going to use a LocalExecutor, that runs tasks directly on the machine where the scheduler is deployed, with docker-compose.
You can see the different kind of airflow executors at: Executors in Apache-Airflow
Establish a network to enable mutual visibility between Airflow and Spark:
docker network create airflow-spark-network
Intentionally, in the previous article on creating Spark containers, we already configured the use of this network.
Generate a file titled Dockerfile-airflow
containing the following content:
FROM apache/airflow:2.8.0-python3.11
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
openjdk-17-jre-headless \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-providers-apache-spark==2.1.3
COPY ./requirements.txt /
RUN pip install -r /requirements.txt
COPY ./lib/postgresql-42.2.6.jar /opt/airflow/lib/postgresql-42.2.6.jar
Mainly we’re installing Java
and the library apache-airflow-providers-apache-spark
in the image, so we can connect Airflow with Spark.
Build the image:
docker build -f Dockerfile-airflow . -t trading-airflow
And download the file docker-compose-airflow.yaml from: https://gist.github.com/xescuder/c05305c92035202693818db272868bda . Start the container executing next command:
docker-compose -f docker-compose-airflow.yaml up -d
The docker-compose-airflow.yaml
file contains a volume mapping with the project:
${AIRFLOW_DAGS}:/opt/airflow/dags
. The DAG should be located in the directorydags
../trading_data_pipeline/:/opt/airflow/trading_data_pipeline
. The Spark ETLs and custom imported modules are located in the foldertrading_data_pipeline
.
Before launching the container, you have the option to set and modify both the username and password. By default, they are both set to
airflow
.
Navigate to http://server:8080 in your web browser and log in using the credentials to gain access to the Airflow dashboard.
Setup the connection with Spark
Add a new connection to Spark from menu Admin > Connections:
- Connection Id:
spark-connection
- Connection Type: Spark
- Host:
spark://spark
- Port: Use the port
7077
.
Setup a connection with Postgres
For the first task, we establish an Airflow connection to PostgreSQL. Navigate to the Airflow dashboard, access Admin > Connections, and select the plus sign at the upper left. Provide the connection parameters for the PostgreSQL database and then click on Save.
Create variables
We have the option to define variables to store values for use in our DAGs.
Navigate to Admin > Variables and establish a new variable named PYSPARK_APP_HOME
with the value /opt/airflow/trading_data_pipeline
(connected to the Docker volume configuration for utilizing our source folder trading_data_pipeline
). This variable specifies the directory containing our Spark code.
Develop the DAG
We create a DAG named load_historical_data_dag
scheduled to run every Monday to Friday at 22:00
If you don't know how to use the cron format I recommend to test and play with https://crontab.guru/ and his examples section.
pyspark_app_home = Variable.get("PYSPARK_APP_HOME")
with DAG('load_historical_data_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='0 22 * * 1-5',
description='Load daily data',
) as dag:
sql_create = PostgresOperator(task_id='sql_create_', postgres_conn_id='trading-data-postgres-connection',
sql='./sql/02-daily_bars_schema.sql')
submit_job = SparkSubmitOperator(
task_id='spark_load_daily_bars_task',
application=f'{pyspark_app_home}/load_daily_bars_etl.py',
conn_id='spark-connection',
jars='./lib/postgresql-42.2.6.jar',
packages='org.postgresql:postgresql:42.6.0',
total_executor_cores=1,
driver_memory='1g',
verbose=True
)
sql_create >> submit_job
This DAG comprises two tasks executed sequentially, connected using the >>
syntax.
sql_create
: Creation of needed tables (if they don't exist)submit_job
: Call Spark Program to load stocks daily data
Tables creation task (sql_create)
The PostgresOperator task allows to execute a SQL script, that is located in the folder dags/sql/02-daily_bars_schema.sql
.
CREATE TABLE IF NOT EXISTS daily_bars (
symbol text,
date date NOT NULL,
open double precision,
high double precision,
low double precision,
close double precision,
volume bigint,
u_close double precision);
SELECT create_hypertable('daily_bars', 'date', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS ix_symbol_date on daily_bars(symbol, date desc);
Spark load daily bars pipeline execution (submit_job)
I suggest reading this article to gain insights into various methods for invoking the Spark code we’ve developed earlier. In our case, we’ll employ the approach using the SparkSubmitOperator.
The Airflow SparkSubmitOperator is an operator that allows to run Spark jobs from within an Airflow DAG.
submit_job = SparkSubmitOperator(
task_id='spark_load_daily_bars_task',
application=f'{pyspark_app_home}/load_daily_bars_etl.py',
conn_id='spark-connection',
jars='./lib/postgresql-42.2.6.jar',
packages='org.postgresql:postgresql:42.6.0',
total_executor_cores=1,
driver_memory='1g',
verbose=True
)
- The
application
parameter specifies the path to the Spark application file that needs to be executed. We're forming the value of location using thepyspark_app_home
variable and the specific application file name, which isload_daily_bars_etl.py
(our Pyspark program) - The
conn_id
parameter identifies the Spark connection established during setup. - The
jars
parameter includes external dependencies necessary for our Spark application. Here, it specifically references the PostgreSQL JAR file. - Within the
packages
parameter, we also declare the PostgreSQL driver.
Execution of DAG
The DAG is already scheduled to be executed, but we can execute it manually at any time from Airflow dashboard from DAGs menu:
Click on name, and then at top right press the play icon
button
We’ll se the dag running. We can go to each run id and see the status:
👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏
Thank you for being part of our community! Before you go:
- If you liked the story feel free to clap 👏 and follow the author.
- Learn How To Develop Your Trading Bots 👉 here.
- Join our Premium Discord Server👉 here.
*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.