Building the Ultimate Trading Data Pipeline — Part 1: Data Pipeline for Populating Stocks Indices Composition
To comprehensively capture and analyze stock data, we may need different data elements:
- Stock symbol/ticker. This is the primary key of our stocks, a unique identifier.
- Exchange and index where the stock is included.
- Company information. Details about the related company, including name, sector, industry, description, logo, etc.
- Historical data. Past stock prices, for trend and technical analysis.
- Splits and dividends history
- News and events that can affect the price of the stock.
- Financial ratios (EPS, P/E, …) and SEC filings
- Analyst recommendations. Recommendations and target prices from analysts.
In this first article, we’ll establish the foundational schema and data pipeline to load indices composition. In subsequent articles we’ll delve into generating additional information as the need arises.
Database schema
Introducing the foundational structure, we initiate by establishing a parent table called instruments
designed for seamless extension across various instrument types —beyond stocks, encompassing indexes, cryptocurrencies, ETFs, and more. The type
column is an enumeration, distinguishing between instrument categories such as STOCK
or INDEX
.
We use the indexes_components
table to establish relationships between instruments that are part of an index, as represented in the indexes
table.
Connect to your database using PgAdmin and create needed tables using next script:
CREATE TYPE tradinginstrumenttypeenum AS ENUM (
'INDEX',
'STOCK'
);
CREATE TABLE IF NOT EXISTS instruments
(
symbol character varying(12),
name character varying(200) NOT NULL,
type tradinginstrumenttypeenum NOT NULL,
isin character varying(12),
CONSTRAINT instruments_pkey PRIMARY KEY (symbol)
)
CREATE TABLE IF NOT EXISTS indexes
(
symbol character varying(12) NOT NULL,
CONSTRAINT indexes_pkey PRIMARY KEY (symbol),
CONSTRAINT instrument_fk FOREIGN KEY (symbol)
REFERENCES instruments (symbol) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID
)
CREATE TABLE IF NOT EXISTS indexes_components
(
index_symbol character varying(12) NOT NULL,
component_symbol character varying(12) NOT NULL,
CONSTRAINT indexes_components_pkey PRIMARY KEY (index_symbol, component_symbol),
CONSTRAINT index_fk FOREIGN KEY (index_symbol)
REFERENCES indexes (symbol) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID,
CONSTRAINT instrument_fk FOREIGN KEY (component_symbol)
REFERENCES instruments (symbol) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID
)
NOTE: At the orchestration article of this series we’ll execute the creation of tables automatically using Airflow.
Data pipeline
Harnessing the capabilities of Apache Spark and Pyspark, our ETL (Extract-Transform-Load) operations will be orchestrated to utilize indices composition data obtained from the EOD HD API to populate our database.
Prepare Spark
Install next modules:
poetry add psycopg2
poetry add pyspark
poetry add findspark
Create a lib
directory within the project and download the JDBC Postgres Driver into this directory.
You can obtain the current version available at the time of writing this article via the following link: https://jdbc.postgresql.org/download/postgresql-42.2.6.jar.
With Spark, the entry point to execute operations is to create a SparkSession. We can do it using the builder pattern builder.getOrCreate()
, that gets an existing SparkSession or, if there is no existing one, creates a new one based on the options.
spark = SparkSession.builder \
.appName("Trading data pipeline") \
.config("spark.jars", "./lib/postgresql-42.2.6.jar") \
.getOrCreate()
In the config
method, we specify the relative path to the previously downloaded PostgreSQL JAR file.
At the end of the code we should stop the session:
spark.stop()
Create Google Sheet with Indices to download
We’ll control the indices to extract in a Google Drive Sheet for tutorial purposes, but you could manage directly in the database. The first column of sheet is associated with the code of index used by EOD HD API.
We can follow a complex process if the sheet is protected. As the information is not really private, we’ll share the link as public. Copy the link created, and change the final part edit?usp=sharing
by gviz/tq?tqx=out:csv.
Extract
We’re going to extract the composition of each index using the EOD HD API: https://eodhd.com/api/fundamentals/
{
"General": {
"Code": "GSPC",
"Type": "INDEX",
"Name": "S&P 500 Index",
"Exchange": "INDX",
"CurrencyCode": "USD",
"CurrencyName": "US Dollar",
"CurrencySymbol": "$",
"CountryName": "USA",
"CountryISO": "US",
"OpenFigi": null
},
"Components": {
"0": {
"Code": "AIZ",
"Exchange": "US",
"Name": "Assurant Inc",
"Sector": "Financial Services",
"Industry": "Insurance - Specialty"
},
"1": {
"Code": "MNST",
"Exchange": "US",
"Name": "Monster Beverage Corp",
"Sector": "Consumer Defensive",
"Industry": "Beverages - Non-Alcoholic"
},
...
}
Using Apache Spark, we have the flexibility to select specific values from the JSON returned in the REST API response by applying a schema. In this instance, our focus is on extracting Code
, Exchange
, and Name
(we'll incorporate Sector
and Industry
in the upcoming articles within the component details data pipeline).
In order to take advantage of the parallelism that Apache Spark offers, each REST API call will be encapsulated by a UDF, which is bound to a DataFrame. Each row in the DataFrame will represent a single call to the REST API. Once an action is executed on the DataFrame, the result from each individual REST API call will be appended to each row as a Structured data type.
from pyspark.sql.functions import col, lit, udf, explode_outer, concat
from pyspark.sql.types import StructType, StructField, StringType, Row, MapType
SCHEMA_URL = StructType([StructField('url', StringType())])
def rest_api_get(url):
res = requests.get(url)
if res is not None and res.status_code == 200:
return res.json()
return None
def extract():
schema = StructType([
StructField('General', StructType([
StructField('Code', StringType(), True)
])),
StructField('Components', MapType(
StringType(),
StructType([
StructField('Code', StringType(), True),
StructField('Exchange', StringType(), True),
StructField('Name', StringType(), True),
StructField('Sector', StringType(), True),
StructField('Industry', StringType(), True),
]),
))
])
df = pd.read_csv(INDICES_CSV_DRIVE_LINK)
indices_symbols = df['symbol'].tolist()
urls = []
for index_symbol in indices_symbols:
urls.append(
Row(f'https://eodhd.com/api/fundamentals/{index_symbol}?api_token={config.EOD_API_TOKEN}&fmt=json'))
api_df = spark.createDataFrame(urls, SCHEMA_URL)
udf_api_call = udf(rest_api_get, schema)
df_data = api_df.withColumn('data', udf_api_call(col('url')))
df_data = df_data.select(col('data.General.Code').alias('index_symbol'), explode_outer('data.Components')).drop('data', 'key')
df_data = df_data.select('value.*', '*').drop('value')
return df_data
▹ df = pd.read_csv(INDICES_CSV_DRIVE_LINK)
: We read the CSV file of indexes into a pandas DataFrame.
▹indices_symbols = df['symbol'].tolist()
: Extracts the symbol
column from the DataFrame into a list.
▹ urls = []
: We construct a list of URLs to obtain the list of components for each index from EOD HD API.
▹ api_df = spark.createDataFrame(urls, SCHEMA_URL)
: We convert the list of URLs into a Spark DataFrame (api_df
) with the specified schema (SCHEMA_URL
). This list allows to create multiple parallel connection to the API, using pyspark Rows.
▹ udf_api_call = udf(rest_api_get, schema)
: We define a user-defined function (UDF) to make a REST API call (rest_api_get
) using the specified schema (schema
). The schema specifies how to map the collected JSON information to our structure. We don't need to specify all the information returned by JSON, only the information we're interested in.
▹ df_data = api_df.withColumn('data', udf_api_call(col('url')))
: Applies the UDF to the url
column in api_df
to fetch data from the API and adds a new column data
(will the fetched information)
▹ df_data.select(col('data.General.Code').alias('index_symbol'), explode_outer('data.Components'))
: We unpack the data
column to create one column for the code of index, and one row for each of the Components.
▹ df_data.select('value.*', '*')
: Finally we unpack the column value
with all their parts as columns.
Transform
In the transform function is where we need to adapt the names of loaded dataframe to the names of our database schema. In the example, our database schema are snake_case
column names based, but the columns obtained from EOD HD API have PascalCase
. We rename the column names with df.toDF(*[snake_case(c) for c in df.columns])
.
We rename also the columns that are no exactly the same in the database, using a dictionary and the function withColumnRenamed
of Pyspark.
As a single symbol may be traded on multiple exchanges, we introduce a new symbol
column to act as our primary key, incorporating the Code
of the symbol along with its corresponding exchange
symbol (with concat
). Finally we remove/drop the columns we'll not need anymore.
def transform(ex_comp_df):
def snake_case(text):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', text)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def rename_cols(df, mapping_dict):
for key in mapping_dict.keys():
df = df.withColumnRenamed(key, mapping_dict.get(key))
return df
cols_dict = {
"code": "symbol",
"exchange": "exchange_symbol",
}
df = ex_comp_df.toDF(*[snake_case(c) for c in ex_comp_df.columns])
df = rename_cols(df, cols_dict)
df = df.withColumn('type', lit('STOCK'))
df = df.withColumn('symbol', concat(col('symbol'), lit('.'), col('exchange_symbol')))
df = df.withColumn('index_symbol', concat(col('index_symbol'), lit('.INDX')))
df = df.drop(col('exchange_symbol'))
return df
The dataframe in this step will be transformed to a structure like the shown below:
Load
In the load step we partition the DataFrame into two distinct DataFrames, each containing the columns that are intended for loading into the instruments
and indexes_components
tables, respectively:
def load(df):
JDBC_PROPERTIES = {
"user": config.DB_USER,
"password": config.DB_PASSWORD,
"driver": "org.postgresql.Driver",
"stringtype": "unspecified" # Added so we can use database type enums
}
indexes_components__db_df = spark.read.jdbc(url=config.get_jdbc_url(), table='indexes_components', properties=JDBC_PROPERTIES)
new_df = df.withColumnRenamed('symbol', 'component_symbol').join(indexes_components__db_df, on=['index_symbol', 'component_symbol'], how='left_anti')
instruments_df = new_df.select('component_symbol', 'name', 'type').withColumnRenamed('component_symbol', 'symbol')
indexes_components_df = new_df.select('index_symbol', 'component_symbol')
instruments_df.write.jdbc(config.get_jdbc_url(), 'instruments', properties=JDBC_PROPERTIES)
indexes_components_df.write.jdbc(config.get_jdbc_url(), 'indexes_components', properties=JDBC_PROPERTIES)
- Using
spark.read.jdbc
, we fetch the current data from theindexes_components
table into a PySpark DataFrame.
▹ Subsequently, we perform a join between the DataFrame obtained from the transformation process and the DataFrame retrieved from the database. This allows us to extract only the new rows that need to be appended to the database. By using the left_anti
parameter, we ensure that all rows from the left DataFrame are returned where there is no matching entry in the right DataFrame.
▹ Next, we designate which columns from the DataFrame will populate the instruments
table and which columns will populate the indexes_components
table. This is achieved by creating two separate DataFrames: instruments_df
and indexes_components_df
.
▹ Lastly, we use the write.jdbc
function in PySpark to persist both DataFrames into their respective tables.
The primary block of code encapsulates the ETL process by invoking the functions mentioned earlier:
load(transform(extract()))
The current article has been dense to establish a foundational understanding. The good news? Future articles in the series will ride on this foundation, making things a breeze!
I extend my heartfelt gratitude to the authors and contributors whose research has significantly shaped this article:
👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏
Thank you for being part of our community! Before you go:
- If you liked the story feel free to clap 👏 and follow the author.
- Learn How To Develop Your Trading Bots 👉 here.
- Join our Premium Discord Server👉 here.
*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.