Building the Ultimate Trading Data Pipeline — Part 4: Executing daily historical data pipeline in a cluster
In previous articles we’ve executed Spark in a local environment, for simpler use. In this article we’ll setup Spark in a cluster.
Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext
object in our main program (called the driver program). This SparkContext can connect to several types of cluster managers - we'll use in this article a standalone cluster manager - which allocate resources across applications. Once connected, Spark:
- Acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
- Next, it sends our application code to the executors.
- Finally sends tasks to the executors to run.
Setup of Spark Cluster (Standalone)
Create a file named Dockerfile-spark
with this content:
FROM bitnami/spark:3.5
USER root
COPY ./requirements.txt /
RUN pip install -r /requirements.txt
We’re defining what external libraries we need to install in the container using a requirements.txt
.
holidays==0.39
python-dotenv==1.0.
requests==2.31.0
I’ve spent hours exploring various options outlined in the Python Package Management documentation, but unfortunately, none of them proved successful. In the end, I opted for a more straightforward approach by installing the required libraries directly inside the container.
Build an image from our Dockerfile-spark
using:
docker build -f Dockerfile-spark . -t trading-spark
Create a file name docker-compose-spark.yaml
with this content:
version: '2'
services:
spark-master:
image: trading-spark:latest
environment:
- SPARK_MODE=master
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
ports:
- '9090:8080'
- '7077:7077'
networks:
- network-airflow-spark
spark-worker:
image: trading-spark:latest
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=2
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
networks:
- airflow-spark-network
networks:
airflow-spark-network:
external: true
By default, we get an Apache Spark cluster with 1 master and 1 worker. If we want for example 3 workers, we can start with:
docker-compose -f docker-compose-spark.yaml up --scale spark-worker=3
Open in a web browser: <ip>:9090
. You'll see the Spark dashboard.
Updating code to connect with Spark cluster
Starting with the code covered in the article Building the Ultimate Trading Data Pipeline — Part 3: Data pipeline for populating daily historical data, we make a slightly adjustment to the creation of the SparkSession to establish a connection with our cluster:
conf = SparkConf().set("spark.executor.memory", "2g")
spark = SparkSession.builder \
.master("spark://<ip-server>:7077") \
.appName("Daily data pipeline") \
.config("spark.executorEnv.DB_USER", "postgres") \
.config("spark.executorEnv.DB_PASSWORD", "<db-password>") \
.config("spark.executorEnv.DB_HOST", "<db_host>") \
.config("spark.executorEnv.DB_NAME", "trading-data-db") \
.config("spark.executorEnv.DB_PORT", 5432) \
.config("spark.executorEnv.EOD_API_TOKEN", "<eod-api-token>") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
.getOrCreate()
context = spark.sparkContext
context.addPyFile("./config.py")
context.addPyFile("./pipeline_utils.py")
context.addPyFile("./date_utils.py")
load(transform(extract(spark)))
spark.stop()
- We designate
2 gigabytes
of memory for each executor. - The Spark master URL is configured using
(.master("spark://<ip-server>:7077"))
to establish a connection with our standalone Spark cluster. - Environment properties required by our code are specified using
spark.executorEnv.<ENV-NAME>
. - The PostgreSQL driver is loaded through
spark.jars.packages
. - Ultimately, we ensure all necessary modules and dependencies are set up using
addPyFile
.
Execution
If we start our load_daily_bars.py
program we can see the execution status from Spark UI dashboard:
Select the application, and subsequently, navigate to the Application Detail UI.
The status of each task can be observed in the details:
The same approach can be applied to all the data pipelines discussed in our earlier articles.
To introduce variety and avoid redundancy in the articles, I have chosen not to delve into the detailed explanation of extracting, transforming, and loading other sources of information such as intraday data, news, fundamental data, and more.
In the process of crafting this article, I want to acknowledge that I’ve drawn inspiration and insights from various references:
- Airflow and Spark: Running Spark Jobs on Airflow (Docker-based Solution)
- Using Apache Spark Docker containers to run pyspark programs using spark-submit
- Writing data to databases using JDBC in Apache Spark
- Apache Spark — local setup for efficient development and debugging
👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏
Thank you for being part of our community! Before you go:
- If you liked the story feel free to clap 👏 and follow the author.
- Learn How To Develop Your Trading Bots 👉 here.
- Join our Premium Discord Server👉 here.
*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.