Building the Ultimate Trading Data Pipeline — Part 2: Data Pipeline for Populating Stocks Details
In this second part we’ll populate our database with stocks details like description, outstanding shares, industry or sector classification.
The information about sectors is specially important if we want to apply a top-down investment strategy, determining the strength of different sectors and then picking the strongest stocks within those sectors to maximize returns.
We’re going to follow the next process:
- Extract. We’re going to get what symbols in table
instruments
are not yet being loaded in tablestocks
. For each symbol we're going to get fundamentals information from EODHD API. - Transform. Adjust the gathered information to match the expected columns and values for the “stocks” table.
- Load. We’re going to store the generated DataFrame into the table
stocks
.
Stock database schema preparation
Connect to your database using PgAdmin and create a new table stocks
using next script:
CREATE TABLE IF NOT EXISTS stocks
(
symbol character varying(12) NOT NULL,
description text COLLATE pg_catalog."default",
address text COLLATE pg_catalog."default",
web_url text COLLATE pg_catalog."default",
logo_url text COLLATE pg_catalog."default",
employees integer,
sector text COLLATE pg_catalog."default",
industry text COLLATE pg_catalog."default",
country_iso text COLLATE pg_catalog."default",
gic_sector text COLLATE pg_catalog."default",
gic_group text COLLATE pg_catalog."default",
gic_industry text COLLATE pg_catalog."default",
gic_sub_industry text COLLATE pg_catalog."default",
ipo_date date,
shares_outstanding bigint,
shares_float bigint,
percent_insiders double precision,
percent_institutions double precision,
CONSTRAINT stocks_pkey PRIMARY KEY (symbol),
CONSTRAINT stocks_fk FOREIGN KEY (symbol)
REFERENCES instruments (symbol) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID
)
Extract
For every symbol/code generated in the instruments
table, we will parallelize calls to the EOD HD API to fetch fundamental details, including identification, sector, industry, IPO date, shares outstanding, logo, and more.
We can see the expected information using the call: https://eodhd.com/api/fundamentals/<symbol>?filter=General&api_token=<your-token>&fmt=json
. Example:
{
"General": {
"Code": "AAPL",
"Type": "Common Stock",
"Name": "Apple Inc",
"Exchange": "NASDAQ",
"CurrencyCode": "USD",
"CurrencyName": "US Dollar",
"CurrencySymbol": "$",
"CountryName": "USA",
"CountryISO": "US",
"OpenFigi": "BBG000B9XRY4",
"ISIN": "US0378331005",
"LEI": "HWUPKR0MPOU8FGXBT394",
"PrimaryTicker": "AAPL.US",
"CUSIP": "037833100",
"CIK": "320193",
"EmployerIdNumber": "94-2404110",
"FiscalYearEnd": "September",
"IPODate": "1980-12-12",
"InternationalDomestic": "International/Domestic",
"Sector": "Technology",
"Industry": "Consumer Electronics",
"GicSector": "Information Technology",
"GicGroup": "Technology Hardware & Equipment",
"GicIndustry": "Technology Hardware, Storage & Peripherals",
"GicSubIndustry": "Technology Hardware, Storage & Peripherals",
"HomeCategory": "Domestic",
"IsDelisted": false,
"Description": "Apple Inc. designs, manufactures, ....",
"Address": "One Apple Park Way, Cupertino, CA, United States, 95014",
...
"Phone": "408 996 1010",
"WebURL": "https://www.apple.com",
"LogoURL": "/img/logos/US/aapl.png",
"FullTimeEmployees": 161000,
"UpdatedAt": "2023-12-05"
},
"SharesStats": {
"SharesOutstanding": 15552799744,
"SharesFloat": 15535488445,
"PercentInsiders": 0.07200000000000001,
"PercentInstitutions": 61.317,
...
}
}
Due to space constraints, some details have been omitted from the provided JSON snippet for brevity.
In the following code, we define the extraction process with a specific schema tailored to retrieve only the relevant information needed, bypassing the need to fetch all available data:
def extract():
schema = StructType([
StructField('General', StructType([
StructField('Code', StringType(), True),
StructField('Exchange', StringType(), True),
StructField('IPODate', StringType(), True),
StructField('CountryISO', StringType(), True),
StructField('Type', StringType(), True),
StructField('Sector', StringType(), True),
StructField('Industry', StringType(), True),
StructField('GicSector', StringType(), True),
StructField('GicGroup', StringType(), True),
StructField('GicIndustry', StringType(), True),
StructField('GicSubIndustry', StringType(), True),
StructField('Description', StringType(), True),
StructField('Address', StringType(), True),
StructField('WebURL', StringType(), True),
StructField('LogoURL', StringType(), True),
StructField('FullTimeEmployees', IntegerType(), True)])),
StructField('SharesStats', StructType([
StructField('SharesOutstanding', IntegerType(), True),
StructField('SharesFloat', IntegerType(), True),
StructField('PercentInsiders', DoubleType(), True),
StructField('PercentInstitutions', DoubleType(), True),
]))
])
# Get data to extract
conn = get_postgres_connection()
cursor = conn.cursor()
cursor.execute(
f"SELECT i.symbol FROM instruments i WHERE i.type='STOCK' AND i.symbol NOT IN (SELECT symbol FROM stocks)")
symbols = [r[0] for r in cursor.fetchall()]
urls = []
for symbol in symbols:
urls.append(
Row(f"https://eodhd.com/api/fundamentals/{symbol}?filter=General,SharesStats&api_token={config.EOD_API_TOKEN}&fmt=json"))
api_df = spark.createDataFrame(urls, SCHEMA_URL)
udf_api_call = udf(rest_api_get, schema)
df_data = api_df.withColumn('data', udf_api_call(col('url'))) # create a column data with response of rest api
df_data = df_data.select('data.*', '*').drop('url', 'col', 'data')
df_data = df_data.select('*', 'General.*', 'SharesStats.*').drop('General', 'SharesStats')
return df_data
- Initially, we establish a connection to the PostgreSQL database to retrieve symbols from the
instruments
table that have not yet been inserted into thestocks
table. (SELECT i.symbol FROM instruments i WHERE i.type='STOCK' AND i.symbol NOT IN (SELECT symbol FROM stocks)
) - We create a list of URLS to be called for each of the fetched symbols into a Spark DataFrame (
api_df
) with the specified schema (SCHEMA_URL
). This list allows us to create multiple parallel connection to the API. udf_api_call = udf(_rest_api_get, schema)
: Here, we establish a user-defined function (UDF) named_rest_api_get
to execute a REST API call (rest_api_get
) using the provided schema (schema
).- In
IPODate
we can't use the typepyspark.sql.types.DateType
to represent the Date on a DataFrame, we'll convert to date in the transform step. ` df_data = api_df.withColumn('data', udf_api_call(col('url')))
: We apply the UDF to theurl
column inapi_df
to fetch data from the API and add a new columndata
(will the fetched information)
- With the next line
df_data.select('*', 'General.*', 'SharesStats.*').drop('General', 'SharesStats')
we unpack thedata
in the two nested fieldsGeneral
andSharesStats
:
- Finally we need to explode again using
df_data.select('*', 'General.*', 'SharesStats.*')
Transform
At the transform we prepare the Dataframe with the information to be loaded in the database schema.
EOD_HD_API_US_SPECIFIC_EXCHANGES = ['NYSE', 'NASDAQ', 'BATS', 'AMEX']
EOD_HD_API_US_EXCHANGE_GROUP = 'US'
def transform(df_data):
stocks_dict = {
"code": "symbol",
"exchange": "exchange_symbol",
"full_time_employees": "employees",
}
df = df_data.toDF(*[snake_case(c) for c in df_data.columns])
df = rename_cols(df, stocks_dict)
df.show()
df = df.filter(df.type == 'Common Stock')
df = df.withColumn('ipo_date', to_date(df['ipo_date'], 'yyyy-MM-dd').cast(DateType()))
df = df.withColumn('logo_url', concat(lit('https://eodhistoricaldata.com'), col('logo_url')))
df = df.withColumn('exchange_symbol', when(col('exchange_symbol').isin(EOD_HD_API_US_SPECIFIC_EXCHANGES),
EOD_HD_API_US_EXCHANGE_GROUP).otherwise(col('exchange_symbol')))
df = df.withColumn('symbol', concat(col('symbol'), lit('.'), col('exchange_symbol')))
df = df.drop('exchange_symbol', 'type')
df = df.na.fill(value=0.0, subset=['percent_insiders', 'percent_institutions'])
return df
An important observation to highlight is that in the preceding article, when we fetched the symbols for the SP500, the API provided the exchange US
for each symbol:
"General": {
"Code": "GSPC",
"Type": "INDEX",
"Name": "S&P 500 Index",
...
},
"Components": {
"0": {
"Code": "AIZ",
"Exchange": "US",
"Name": "Assurant Inc",
"Sector": "Financial Services",
"Industry": "Insurance - Specialty"
This occurs because EOD HD consolidates the markets of NYSE
, NASDAQ
, BATS
, and AMEX
under the exchange US
in the index components API. If we request the fundamentals for one of these symbols, such as AIZ.US
(https://eodhd.com/api/fundamentals/AIZ.US?filter=General,SharesStats&api_token=<your-token>
):
{
"General": {
"Code": "AIZ",
"Type": "Common Stock",
"Name": "Assurant Inc",
"Exchange": "NYSE",
...
We are retrieving NYSE
instead of US
. To ensure a consistent relationship between the symbols in the stocks
table and the instruments
table, we substitute all occurrences of exchanges listed as NYSE
, NASDAQ
, BATS
, and AMEX
with the exchange US
:
df = df.withColumn('exchange_symbol', when(col('exchange_symbol').isin(EOD_HD_API_US_SPECIFIC_EXCHANGES), EOD_HD_API_US_EXCHANGE_GROUP).otherwise(col('exchange_symbol')))
Load
In the load step, we append the information into the table stocks
:
def load(df):
if df.count() > 0:
# mode append to truncate all and recreate
df.write.jdbc(config.get_jdbc_url(), 'stocks', mode='append', properties=JDBC_PROPERTIES)
👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏
Thank you for being part of our community! Before you go:
- If you liked the story feel free to clap 👏 and follow the author.
- Learn How To Develop Your Trading Bots 👉 here.
- Join our Premium Discord Server👉 here.
*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.