avatarXavier Escudero

Summary

The article discusses building a data pipeline for populating stocks indices composition using Apache Spark and Pyspark, with a focus on establishing the foundational schema and data pipeline.

Abstract

The article begins with an introduction to the data elements required for comprehensively capturing and analyzing stock data, including stock symbol, exchange and index, company information, historical data, splits and dividends history, news and events, financial ratios, and analyst recommendations. The author then describes the process of establishing the foundational schema and data pipeline for loading indices composition, using a parent table called "instruments" and a "type" column to distinguish between different instrument categories. The article also includes an ERD diagram created with pgModeler and a script for creating needed tables using PgAdmin. The author then discusses the data pipeline, which involves harnessing the capabilities of Apache Spark and Pyspark to orchestrate ETL operations and utilize indices composition data obtained from the EOD HD API to populate the database. The article also includes instructions for preparing Spark, creating a Google Sheet with indices to download, and extracting data from the EOD HD API.

Opinions

  • The author believes that using Apache Spark and Pyspark can provide the flexibility and parallelism needed to efficiently process large amounts of data for stock analysis.
  • The author emphasizes the importance of establishing a foundational schema and data pipeline for loading indices composition as a starting point for generating additional information as needed.
  • The author provides detailed instructions and code examples for implementing the data pipeline, suggesting a high level of expertise and confidence in the proposed solution.
  • The author acknowledges the complexity of the process and provides resources for further learning and support.
  • The author expresses gratitude to the authors and contributors whose research has shaped the article and encourages readers to engage with the community and continue learning.

Building the Ultimate Trading Data Pipeline — Part 1: Data Pipeline for Populating Stocks Indices Composition

To comprehensively capture and analyze stock data, we may need different data elements:

  • Stock symbol/ticker. This is the primary key of our stocks, a unique identifier.
  • Exchange and index where the stock is included.
  • Company information. Details about the related company, including name, sector, industry, description, logo, etc.
  • Historical data. Past stock prices, for trend and technical analysis.
  • Splits and dividends history
  • News and events that can affect the price of the stock.
  • Financial ratios (EPS, P/E, …) and SEC filings
  • Analyst recommendations. Recommendations and target prices from analysts.

In this first article, we’ll establish the foundational schema and data pipeline to load indices composition. In subsequent articles we’ll delve into generating additional information as the need arises.

Database schema

Introducing the foundational structure, we initiate by establishing a parent table called instruments designed for seamless extension across various instrument types —beyond stocks, encompassing indexes, cryptocurrencies, ETFs, and more. The type column is an enumeration, distinguishing between instrument categories such as STOCK or INDEX.

We use the indexes_components table to establish relationships between instruments that are part of an index, as represented in the indexes table.

ERD Diagram created with pgModeler

Connect to your database using PgAdmin and create needed tables using next script:

CREATE TYPE tradinginstrumenttypeenum AS ENUM (
    'INDEX',
    'STOCK'
);

CREATE TABLE IF NOT EXISTS instruments
(
    symbol character varying(12),
    name character varying(200) NOT NULL,
    type tradinginstrumenttypeenum NOT NULL,
    isin character varying(12),
    CONSTRAINT instruments_pkey PRIMARY KEY (symbol)
)
CREATE TABLE IF NOT EXISTS indexes
(
    symbol character varying(12) NOT NULL,
    CONSTRAINT indexes_pkey PRIMARY KEY (symbol),
    CONSTRAINT instrument_fk FOREIGN KEY (symbol)
        REFERENCES instruments (symbol) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE NO ACTION
        NOT VALID
)
CREATE TABLE IF NOT EXISTS indexes_components
(
    index_symbol character varying(12) NOT NULL,
    component_symbol character varying(12) NOT NULL,
    CONSTRAINT indexes_components_pkey PRIMARY KEY (index_symbol, component_symbol),
    CONSTRAINT index_fk FOREIGN KEY (index_symbol)
        REFERENCES indexes (symbol) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE NO ACTION
        NOT VALID,
    CONSTRAINT instrument_fk FOREIGN KEY (component_symbol)
        REFERENCES instruments (symbol) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE NO ACTION
        NOT VALID
)

NOTE: At the orchestration article of this series we’ll execute the creation of tables automatically using Airflow.

Data pipeline

Harnessing the capabilities of Apache Spark and Pyspark, our ETL (Extract-Transform-Load) operations will be orchestrated to utilize indices composition data obtained from the EOD HD API to populate our database.

Prepare Spark

Install next modules:

poetry add psycopg2
poetry add pyspark
poetry add findspark

Create a lib directory within the project and download the JDBC Postgres Driver into this directory.

You can obtain the current version available at the time of writing this article via the following link: https://jdbc.postgresql.org/download/postgresql-42.2.6.jar.

With Spark, the entry point to execute operations is to create a SparkSession. We can do it using the builder pattern builder.getOrCreate(), that gets an existing SparkSession or, if there is no existing one, creates a new one based on the options.

spark = SparkSession.builder \  
    .appName("Trading data pipeline") \  
    .config("spark.jars", "./lib/postgresql-42.2.6.jar") \  
    .getOrCreate()

In the config method, we specify the relative path to the previously downloaded PostgreSQL JAR file.

At the end of the code we should stop the session:

spark.stop()

Create Google Sheet with Indices to download

We’ll control the indices to extract in a Google Drive Sheet for tutorial purposes, but you could manage directly in the database. The first column of sheet is associated with the code of index used by EOD HD API.

We can follow a complex process if the sheet is protected. As the information is not really private, we’ll share the link as public. Copy the link created, and change the final part edit?usp=sharing by gviz/tq?tqx=out:csv.

e.g: https://docs.google.com/spreadsheets/d/1PKhKeLR7DQCEqTjicEPkDKPHjEJB7POhb2W_T_OEopg/gviz/tq?tqx=out:csv

Extract

We’re going to extract the composition of each index using the EOD HD API: https://eodhd.com/api/fundamentals/.INDX?api_token=&fmt=json

{
    "General": {
        "Code": "GSPC",
        "Type": "INDEX",
        "Name": "S&P 500 Index",
        "Exchange": "INDX",
        "CurrencyCode": "USD",
        "CurrencyName": "US Dollar",
        "CurrencySymbol": "$",
        "CountryName": "USA",
        "CountryISO": "US",
        "OpenFigi": null
    },
    "Components": {
        "0": {
            "Code": "AIZ",
            "Exchange": "US",
            "Name": "Assurant Inc",
            "Sector": "Financial Services",
            "Industry": "Insurance - Specialty"
        },
        "1": {
            "Code": "MNST",
            "Exchange": "US",
            "Name": "Monster Beverage Corp",
            "Sector": "Consumer Defensive",
            "Industry": "Beverages - Non-Alcoholic"
        },
        ...
}

Using Apache Spark, we have the flexibility to select specific values from the JSON returned in the REST API response by applying a schema. In this instance, our focus is on extracting Code, Exchange, and Name (we'll incorporate Sector and Industry in the upcoming articles within the component details data pipeline).

In order to take advantage of the parallelism that Apache Spark offers, each REST API call will be encapsulated by a UDF, which is bound to a DataFrame. Each row in the DataFrame will represent a single call to the REST API. Once an action is executed on the DataFrame, the result from each individual REST API call will be appended to each row as a Structured data type.

from pyspark.sql.functions import col, lit, udf, explode_outer, concat  
from pyspark.sql.types import StructType, StructField, StringType, Row, MapType

SCHEMA_URL = StructType([StructField('url', StringType())])
def rest_api_get(url):  
    res = requests.get(url)  
    if res is not None and res.status_code == 200:  
        return res.json()  
    return None
def extract():  
    schema = StructType([  
        StructField('General', StructType([  
           StructField('Code', StringType(), True)  
        ])),  
        StructField('Components', MapType(  
            StringType(),  
            StructType([  
                StructField('Code', StringType(), True),  
                StructField('Exchange', StringType(), True),  
                StructField('Name', StringType(), True),  
                StructField('Sector', StringType(), True),  
                StructField('Industry', StringType(), True),  
            ]),  
        ))  
    ])  
    df = pd.read_csv(INDICES_CSV_DRIVE_LINK)  
    indices_symbols = df['symbol'].tolist()  
    urls = []  
    for index_symbol in indices_symbols:  
        urls.append(  
    Row(f'https://eodhd.com/api/fundamentals/{index_symbol}?api_token={config.EOD_API_TOKEN}&fmt=json'))  
    api_df = spark.createDataFrame(urls, SCHEMA_URL)  
    udf_api_call = udf(rest_api_get, schema)  
    df_data = api_df.withColumn('data', udf_api_call(col('url')))  
    df_data = df_data.select(col('data.General.Code').alias('index_symbol'), explode_outer('data.Components')).drop('data', 'key')  
    df_data = df_data.select('value.*', '*').drop('value')  
    return df_data

df = pd.read_csv(INDICES_CSV_DRIVE_LINK): We read the CSV file of indexes into a pandas DataFrame.

indices_symbols = df['symbol'].tolist(): Extracts the symbol column from the DataFrame into a list.

urls = []: We construct a list of URLs to obtain the list of components for each index from EOD HD API.

api_df = spark.createDataFrame(urls, SCHEMA_URL): We convert the list of URLs into a Spark DataFrame (api_df) with the specified schema (SCHEMA_URL). This list allows to create multiple parallel connection to the API, using pyspark Rows.

udf_api_call = udf(rest_api_get, schema): We define a user-defined function (UDF) to make a REST API call (rest_api_get) using the specified schema (schema). The schema specifies how to map the collected JSON information to our structure. We don't need to specify all the information returned by JSON, only the information we're interested in.

df_data = api_df.withColumn('data', udf_api_call(col('url'))): Applies the UDF to the url column in api_df to fetch data from the API and adds a new column data (will the fetched information)

df_data.select(col('data.General.Code').alias('index_symbol'), explode_outer('data.Components')): We unpack the data column to create one column for the code of index, and one row for each of the Components.

df_data.select('value.*', '*'): Finally we unpack the column value with all their parts as columns.

Transform

In the transform function is where we need to adapt the names of loaded dataframe to the names of our database schema. In the example, our database schema are snake_case column names based, but the columns obtained from EOD HD API have PascalCase. We rename the column names with df.toDF(*[snake_case(c) for c in df.columns]).

We rename also the columns that are no exactly the same in the database, using a dictionary and the function withColumnRenamed of Pyspark.

As a single symbol may be traded on multiple exchanges, we introduce a new symbol column to act as our primary key, incorporating the Code of the symbol along with its corresponding exchange symbol (with concat). Finally we remove/drop the columns we'll not need anymore.

def transform(ex_comp_df):  
    def snake_case(text):  
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', text)  
        return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
    def rename_cols(df, mapping_dict):  
        for key in mapping_dict.keys():  
            df = df.withColumnRenamed(key, mapping_dict.get(key))  
        return df
    cols_dict = {  
        "code": "symbol",  
        "exchange": "exchange_symbol",  
    }  
    df = ex_comp_df.toDF(*[snake_case(c) for c in ex_comp_df.columns])  
    df = rename_cols(df, cols_dict)  
    df = df.withColumn('type', lit('STOCK'))  
    df = df.withColumn('symbol', concat(col('symbol'), lit('.'), col('exchange_symbol')))  
    df = df.withColumn('index_symbol', concat(col('index_symbol'), lit('.INDX')))
    df = df.drop(col('exchange_symbol'))
    return df

The dataframe in this step will be transformed to a structure like the shown below:

Load

In the load step we partition the DataFrame into two distinct DataFrames, each containing the columns that are intended for loading into the instruments and indexes_components tables, respectively:

def load(df):
    JDBC_PROPERTIES = {  
        "user": config.DB_USER,  
        "password": config.DB_PASSWORD,  
        "driver": "org.postgresql.Driver",  
        "stringtype": "unspecified" # Added so we can use database type enums  
    }

    indexes_components__db_df = spark.read.jdbc(url=config.get_jdbc_url(), table='indexes_components', properties=JDBC_PROPERTIES)

new_df = df.withColumnRenamed('symbol', 'component_symbol').join(indexes_components__db_df, on=['index_symbol', 'component_symbol'], how='left_anti')
   
    instruments_df = new_df.select('component_symbol', 'name', 'type').withColumnRenamed('component_symbol', 'symbol')
    indexes_components_df = new_df.select('index_symbol', 'component_symbol')
    instruments_df.write.jdbc(config.get_jdbc_url(), 'instruments', properties=JDBC_PROPERTIES)
    indexes_components_df.write.jdbc(config.get_jdbc_url(), 'indexes_components', properties=JDBC_PROPERTIES)
  • Using spark.read.jdbc, we fetch the current data from the indexes_components table into a PySpark DataFrame.

▹ Subsequently, we perform a join between the DataFrame obtained from the transformation process and the DataFrame retrieved from the database. This allows us to extract only the new rows that need to be appended to the database. By using the left_anti parameter, we ensure that all rows from the left DataFrame are returned where there is no matching entry in the right DataFrame.

▹ Next, we designate which columns from the DataFrame will populate the instruments table and which columns will populate the indexes_components table. This is achieved by creating two separate DataFrames: instruments_df and indexes_components_df.

▹ Lastly, we use the write.jdbc function in PySpark to persist both DataFrames into their respective tables.

The primary block of code encapsulates the ETL process by invoking the functions mentioned earlier:

load(transform(extract()))

The current article has been dense to establish a foundational understanding. The good news? Future articles in the series will ride on this foundation, making things a breeze!

I extend my heartfelt gratitude to the authors and contributors whose research has significantly shaped this article:

👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏

Thank you for being part of our community! Before you go:

  • If you liked the story feel free to clap 👏 and follow the author.
  • Learn How To Develop Your Trading Bots 👉 here.
  • Join our Premium Discord Server👉 here.

*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.

Pyspark
Trading
Eod Stock Market Api
Recommended from ReadMedium