avatarXavier Escudero

Summary

The article outlines the process of setting up an Apache Spark cluster for executing a daily historical data pipeline, detailing the configuration and execution of Spark applications within a Dockerized environment.

Abstract

The article is the fourth installment in a series focused on building a trading data pipeline. It transitions from running Apache Spark locally to setting up a Spark cluster using Docker and Docker Compose. The author provides a step-by-step guide to create a Dockerfile-spark for installing necessary Python libraries and a docker-compose-spark.yaml file to define the Spark master and worker services. The cluster setup is designed to be scalable, allowing for the addition of more workers as needed. The article also covers how to modify the SparkSession configuration to connect with the Spark cluster, manage executor memory, and pass environment variables for database connections. It concludes with instructions on executing the data pipeline and monitoring its progress through the Spark UI dashboard.

Opinions

  • The author expresses a preference for directly installing required libraries inside the Docker container after exploring various options for Python package management.
  • The author acknowledges the inspiration and insights drawn from other reference materials and community contributions, which have informed the approach and content of the article.
  • A deliberate choice was made to focus on the cluster setup and execution rather than revisiting the detailed explanation of extracting, transforming, and loading other types of data, to avoid redundancy and introduce variety in the series.
  • The author emphasizes the educational nature of the content, stating that it is intended for informational or educational purposes and not as personal investment advice.

Building the Ultimate Trading Data Pipeline — Part 4: Executing daily historical data pipeline in a cluster

In previous articles we’ve executed Spark in a local environment, for simpler use. In this article we’ll setup Spark in a cluster.

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in our main program (called the driver program). This SparkContext can connect to several types of cluster managers - we'll use in this article a standalone cluster manager - which allocate resources across applications. Once connected, Spark:

  1. Acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
  2. Next, it sends our application code to the executors.
  3. Finally sends tasks to the executors to run.
https://spark.apache.org/docs/latest/cluster-overview.html

Setup of Spark Cluster (Standalone)

Create a file named Dockerfile-sparkwith this content:

FROM bitnami/spark:3.5  
USER root

COPY ./requirements.txt /  
RUN pip install -r /requirements.txt  

We’re defining what external libraries we need to install in the container using a requirements.txt.

holidays==0.39  
python-dotenv==1.0.  
requests==2.31.0

I’ve spent hours exploring various options outlined in the Python Package Management documentation, but unfortunately, none of them proved successful. In the end, I opted for a more straightforward approach by installing the required libraries directly inside the container.

Build an image from our Dockerfile-spark using:

docker build -f Dockerfile-spark . -t trading-spark

Create a file name docker-compose-spark.yamlwith this content:

version: '2'
services:  
  spark-master:  
    image: trading-spark:latest  
    environment:  
      - SPARK_MODE=master  
      - SPARK_LOCAL_IP=spark-master  
      - SPARK_WORKLOAD=master  
    ports:  
      - '9090:8080'  
      - '7077:7077'  
    networks:  
    - network-airflow-spark  
  spark-worker:  
    image: trading-spark:latest  
    environment:  
      - SPARK_MODE=worker  
      - SPARK_MASTER_URL=spark://spark-master:7077  
      - SPARK_WORKER_MEMORY=1G  
      - SPARK_WORKER_CORES=2  
      - SPARK_RPC_AUTHENTICATION_ENABLED=no  
      - SPARK_RPC_ENCRYPTION_ENABLED=no  
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no  
      - SPARK_SSL_ENABLED=no  
      - SPARK_USER=spark  
    networks:  
      - airflow-spark-network
networks:  
  airflow-spark-network:  
    external: true 

By default, we get an Apache Spark cluster with 1 master and 1 worker. If we want for example 3 workers, we can start with:

docker-compose -f docker-compose-spark.yaml up --scale spark-worker=3

Open in a web browser: <ip>:9090. You'll see the Spark dashboard.

Updating code to connect with Spark cluster

Starting with the code covered in the article Building the Ultimate Trading Data Pipeline — Part 3: Data pipeline for populating daily historical data, we make a slightly adjustment to the creation of the SparkSession to establish a connection with our cluster:

conf = SparkConf().set("spark.executor.memory", "2g")  

spark = SparkSession.builder \  
    .master("spark://<ip-server>:7077") \  
    .appName("Daily data pipeline") \  
    .config("spark.executorEnv.DB_USER", "postgres") \  
    .config("spark.executorEnv.DB_PASSWORD", "<db-password>") \  
    .config("spark.executorEnv.DB_HOST", "<db_host>") \  
    .config("spark.executorEnv.DB_NAME", "trading-data-db") \  
    .config("spark.executorEnv.DB_PORT", 5432) \  
    .config("spark.executorEnv.EOD_API_TOKEN", "<eod-api-token>") \ 
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \  
    .getOrCreate()  
context = spark.sparkContext  
context.addPyFile("./config.py")  
context.addPyFile("./pipeline_utils.py")  
context.addPyFile("./date_utils.py")  
load(transform(extract(spark)))  
spark.stop()
  • We designate 2 gigabytes of memory for each executor.
  • The Spark master URL is configured using (.master("spark://<ip-server>:7077")) to establish a connection with our standalone Spark cluster.
  • Environment properties required by our code are specified using spark.executorEnv.<ENV-NAME>.
  • The PostgreSQL driver is loaded through spark.jars.packages.
  • Ultimately, we ensure all necessary modules and dependencies are set up using addPyFile.

Execution

If we start our load_daily_bars.pyprogram we can see the execution status from Spark UI dashboard:

Select the application, and subsequently, navigate to the Application Detail UI.

The status of each task can be observed in the details:

The same approach can be applied to all the data pipelines discussed in our earlier articles.

To introduce variety and avoid redundancy in the articles, I have chosen not to delve into the detailed explanation of extracting, transforming, and loading other sources of information such as intraday data, news, fundamental data, and more.

In the process of crafting this article, I want to acknowledge that I’ve drawn inspiration and insights from various references:

👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏

Thank you for being part of our community! Before you go:

  • If you liked the story feel free to clap 👏 and follow the author.
  • Learn How To Develop Your Trading Bots 👉 here.
  • Join our Premium Discord Server👉 here.

*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.

Pyspark
Spark
Etl Pipeline
Recommended from ReadMedium