avatarXavier Escudero

Summary

The article outlines a process for populating a stock trading database with detailed financial information using the EODHD API, involving extraction, transformation, and loading of data.

Abstract

The article, titled "Building the Ultimate Trading Data Pipeline — Part 2: Data Pipeline for Populating Stocks Details," focuses on enhancing a trading database with comprehensive stock details. It details a three-step process: extracting data from the EODHD API, transforming the data to match the database schema, and loading the processed data into the 'stocks' table. The extraction step involves fetching fundamental details for stock symbols not yet in the 'stocks' table. The transformation step includes adjusting the data to the schema, converting data types, and ensuring consistency for U.S. exchange symbols. The loading step appends the transformed data into the 'stocks' table. The article emphasizes the importance of sector information for investment strategies and provides a schema for the 'stocks' table, code snippets for the data pipeline, and considerations for handling U.S. exchange data.

Opinions

  • The author believes that sector information is crucial for a top-down investment strategy.
  • The author suggests that parallel API calls can improve efficiency in data extraction.
  • There is an opinion that consistent exchange symbols are important for maintaining a reliable relationship between the 'stocks' and 'instruments' tables.
  • The article implies that the EODHD API is a valuable resource for obtaining comprehensive stock fundamentals data.
  • The author recommends a specific approach to handle missing or null values in the dataset.
  • The author values reader engagement and encourages support through claps and following the author for further learning opportunities.
  • It is mentioned that the article is intended for educational purposes and not as personal investment advice.

Building the Ultimate Trading Data Pipeline — Part 2: Data Pipeline for Populating Stocks Details

In this second part we’ll populate our database with stocks details like description, outstanding shares, industry or sector classification.

The information about sectors is specially important if we want to apply a top-down investment strategy, determining the strength of different sectors and then picking the strongest stocks within those sectors to maximize returns.

We’re going to follow the next process:

  1. Extract. We’re going to get what symbols in table instruments are not yet being loaded in table stocks. For each symbol we're going to get fundamentals information from EODHD API.
  2. Transform. Adjust the gathered information to match the expected columns and values for the “stocks” table.
  3. Load. We’re going to store the generated DataFrame into the table stocks.

Stock database schema preparation

Connect to your database using PgAdmin and create a new table stocks using next script:

CREATE TABLE IF NOT EXISTS stocks
(
    symbol character varying(12) NOT NULL,
    description text COLLATE pg_catalog."default",
    address text COLLATE pg_catalog."default",
    web_url text COLLATE pg_catalog."default",
    logo_url text COLLATE pg_catalog."default",
    employees integer,
    sector text COLLATE pg_catalog."default",
    industry text COLLATE pg_catalog."default",
    country_iso text COLLATE pg_catalog."default",
    gic_sector text COLLATE pg_catalog."default",
    gic_group text COLLATE pg_catalog."default",
    gic_industry text COLLATE pg_catalog."default",
    gic_sub_industry text COLLATE pg_catalog."default",
    ipo_date date,
    shares_outstanding bigint,
    shares_float bigint,
    percent_insiders double precision,
    percent_institutions double precision,
    CONSTRAINT stocks_pkey PRIMARY KEY (symbol),
    CONSTRAINT stocks_fk FOREIGN KEY (symbol)
        REFERENCES instruments (symbol) MATCH SIMPLE
        ON UPDATE NO ACTION
        ON DELETE NO ACTION
        NOT VALID
)

Extract

For every symbol/code generated in the instruments table, we will parallelize calls to the EOD HD API to fetch fundamental details, including identification, sector, industry, IPO date, shares outstanding, logo, and more.

We can see the expected information using the call: https://eodhd.com/api/fundamentals/<symbol>?filter=General&api_token=<your-token>&fmt=json. Example:

{
    "General": {
        "Code": "AAPL",
        "Type": "Common Stock",
        "Name": "Apple Inc",
        "Exchange": "NASDAQ",
        "CurrencyCode": "USD",
        "CurrencyName": "US Dollar",
        "CurrencySymbol": "$",
        "CountryName": "USA",
        "CountryISO": "US",
        "OpenFigi": "BBG000B9XRY4",
        "ISIN": "US0378331005",
        "LEI": "HWUPKR0MPOU8FGXBT394",
        "PrimaryTicker": "AAPL.US",
        "CUSIP": "037833100",
        "CIK": "320193",
        "EmployerIdNumber": "94-2404110",
        "FiscalYearEnd": "September",
        "IPODate": "1980-12-12",
        "InternationalDomestic": "International/Domestic",
        "Sector": "Technology",
        "Industry": "Consumer Electronics",
        "GicSector": "Information Technology",
        "GicGroup": "Technology Hardware & Equipment",
        "GicIndustry": "Technology Hardware, Storage & Peripherals",
        "GicSubIndustry": "Technology Hardware, Storage & Peripherals",
        "HomeCategory": "Domestic",
        "IsDelisted": false,
        "Description": "Apple Inc. designs, manufactures, ....",
        "Address": "One Apple Park Way, Cupertino, CA, United States, 95014",
        ...
        "Phone": "408 996 1010",
        "WebURL": "https://www.apple.com",
        "LogoURL": "/img/logos/US/aapl.png",
        "FullTimeEmployees": 161000,
        "UpdatedAt": "2023-12-05"
},      
    "SharesStats": {    
        "SharesOutstanding": 15552799744,
        "SharesFloat": 15535488445,
        "PercentInsiders": 0.07200000000000001,
        "PercentInstitutions": 61.317,
        ...
    }
}

Due to space constraints, some details have been omitted from the provided JSON snippet for brevity.

In the following code, we define the extraction process with a specific schema tailored to retrieve only the relevant information needed, bypassing the need to fetch all available data:

def extract():       
    schema = StructType([  
        StructField('General', StructType([  
            StructField('Code', StringType(), True),  
            StructField('Exchange', StringType(), True),  
            StructField('IPODate', StringType(), True),  
            StructField('CountryISO', StringType(), True),  
            StructField('Type', StringType(), True),  
            StructField('Sector', StringType(), True),  
            StructField('Industry', StringType(), True),  
            StructField('GicSector', StringType(), True),  
            StructField('GicGroup', StringType(), True),  
            StructField('GicIndustry', StringType(), True),  
            StructField('GicSubIndustry', StringType(), True),  
            StructField('Description', StringType(), True),  
            StructField('Address', StringType(), True),  
            StructField('WebURL', StringType(), True),  
            StructField('LogoURL', StringType(), True),  
            StructField('FullTimeEmployees', IntegerType(), True)])),  
        StructField('SharesStats', StructType([  
            StructField('SharesOutstanding', IntegerType(), True),  
            StructField('SharesFloat', IntegerType(), True),  
            StructField('PercentInsiders', DoubleType(), True),  
            StructField('PercentInstitutions', DoubleType(), True),  
        ]))  
    ])  

    # Get data to extract  
    conn = get_postgres_connection()  
    cursor = conn.cursor()  
    cursor.execute(  
        f"SELECT i.symbol FROM instruments i WHERE i.type='STOCK' AND i.symbol NOT IN (SELECT symbol FROM stocks)")  
    symbols = [r[0] for r in cursor.fetchall()]  
    urls = []  
    for symbol in symbols:  
        urls.append(  
            Row(f"https://eodhd.com/api/fundamentals/{symbol}?filter=General,SharesStats&api_token={config.EOD_API_TOKEN}&fmt=json"))  
    api_df = spark.createDataFrame(urls, SCHEMA_URL)  
    udf_api_call = udf(rest_api_get, schema)  
    df_data = api_df.withColumn('data', udf_api_call(col('url')))  # create a column data with response of rest api  
    df_data = df_data.select('data.*', '*').drop('url', 'col', 'data')  
    df_data = df_data.select('*', 'General.*', 'SharesStats.*').drop('General', 'SharesStats')  
    return df_data
  • Initially, we establish a connection to the PostgreSQL database to retrieve symbols from the instruments table that have not yet been inserted into the stocks table. (SELECT i.symbol FROM instruments i WHERE i.type='STOCK' AND i.symbol NOT IN (SELECT symbol FROM stocks))
  • We create a list of URLS to be called for each of the fetched symbols into a Spark DataFrame (api_df) with the specified schema (SCHEMA_URL). This list allows us to create multiple parallel connection to the API.
  • udf_api_call = udf(_rest_api_get, schema): Here, we establish a user-defined function (UDF) named _rest_api_get to execute a REST API call (rest_api_get) using the provided schema (schema).
  • In IPODate we can't use the type pyspark.sql.types.DateType to represent the Date on a DataFrame, we'll convert to date in the transform step. `
  • df_data = api_df.withColumn('data', udf_api_call(col('url'))): We apply the UDF to the url column in api_df to fetch data from the API and add a new column data (will the fetched information)
  • With the next line df_data.select('*', 'General.*', 'SharesStats.*').drop('General', 'SharesStats')we unpack the datain the two nested fields Generaland SharesStats:
  • Finally we need to explode again using df_data.select('*', 'General.*', 'SharesStats.*')

Transform

At the transform we prepare the Dataframe with the information to be loaded in the database schema.

EOD_HD_API_US_SPECIFIC_EXCHANGES = ['NYSE', 'NASDAQ', 'BATS', 'AMEX']  
EOD_HD_API_US_EXCHANGE_GROUP = 'US'

def transform(df_data):  
    stocks_dict = {  
        "code": "symbol",  
        "exchange": "exchange_symbol",  
        "full_time_employees": "employees",  
    }  
    df = df_data.toDF(*[snake_case(c) for c in df_data.columns])  
    df = rename_cols(df, stocks_dict)  
    df.show()  
    df = df.filter(df.type == 'Common Stock')  
    df = df.withColumn('ipo_date', to_date(df['ipo_date'], 'yyyy-MM-dd').cast(DateType()))  
    df = df.withColumn('logo_url', concat(lit('https://eodhistoricaldata.com'), col('logo_url')))  
    df = df.withColumn('exchange_symbol', when(col('exchange_symbol').isin(EOD_HD_API_US_SPECIFIC_EXCHANGES), 
EOD_HD_API_US_EXCHANGE_GROUP).otherwise(col('exchange_symbol')))  
    df = df.withColumn('symbol', concat(col('symbol'), lit('.'), col('exchange_symbol')))  
    df = df.drop('exchange_symbol', 'type')  
    df = df.na.fill(value=0.0, subset=['percent_insiders', 'percent_institutions'])  
    return df

An important observation to highlight is that in the preceding article, when we fetched the symbols for the SP500, the API provided the exchange US for each symbol:

"General": {
    "Code": "GSPC",
    "Type": "INDEX",
    "Name": "S&P 500 Index",
    ...
},
"Components": {
    "0": {
        "Code": "AIZ",
        "Exchange": "US",
        "Name": "Assurant Inc",
        "Sector": "Financial Services",
        "Industry": "Insurance - Specialty"

This occurs because EOD HD consolidates the markets of NYSE, NASDAQ, BATS, and AMEX under the exchange US in the index components API. If we request the fundamentals for one of these symbols, such as AIZ.US(https://eodhd.com/api/fundamentals/AIZ.US?filter=General,SharesStats&api_token=<your-token>):

{
    "General": {
        "Code": "AIZ",
        "Type": "Common Stock",
        "Name": "Assurant Inc",
        "Exchange": "NYSE",
        ...

We are retrieving NYSE instead of US. To ensure a consistent relationship between the symbols in the stocks table and the instruments table, we substitute all occurrences of exchanges listed as NYSE, NASDAQ, BATS, and AMEX with the exchange US:

df = df.withColumn('exchange_symbol', when(col('exchange_symbol').isin(EOD_HD_API_US_SPECIFIC_EXCHANGES),                          EOD_HD_API_US_EXCHANGE_GROUP).otherwise(col('exchange_symbol')))

Load

In the load step, we append the information into the table stocks:

def load(df):  
    if df.count() > 0:  
        # mode append to truncate all and recreate  
        df.write.jdbc(config.get_jdbc_url(), 'stocks', mode='append', properties=JDBC_PROPERTIES)

👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏

Thank you for being part of our community! Before you go:

  • If you liked the story feel free to clap 👏 and follow the author.
  • Learn How To Develop Your Trading Bots 👉 here.
  • Join our Premium Discord Server👉 here.

*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.

Python
Pyspark
Trading
Eod Stock Market Api
Recommended from ReadMedium