Building the Ultimate Trading Data Pipeline — Part 2: Data Pipeline for Populating Stocks Details

In this second part we’ll populate our database with stocks details like description, outstanding shares, industry or sector classification.

The information about sectors is specially important if we want to apply a top-down investment strategy, determining the strength of different sectors and then picking the strongest stocks within those sectors to maximize returns.

We’re going to follow the next process:

  1. Extract. We’re going to get what symbols in table instruments are not yet being loaded in table stocks. For each symbol we're going to get fundamentals information from EODHD API.
  2. Transform. Adjust the gathered information to match the expected columns and values for the “stocks” table.
  3. Load. We’re going to store the generated DataFrame into the table stocks.

Stock database schema preparation

Connect to your database using PgAdmin and create a new table stocks using next script:

    symbol character varying(12) NOT NULL,
    description text COLLATE pg_catalog."default",
    address text COLLATE pg_catalog."default",
    web_url text COLLATE pg_catalog."default",
    logo_url text COLLATE pg_catalog."default",
    employees integer,
    sector text COLLATE pg_catalog."default",
    industry text COLLATE pg_catalog."default",
    country_iso text COLLATE pg_catalog."default",
    gic_sector text COLLATE pg_catalog."default",
    gic_group text COLLATE pg_catalog."default",
    gic_industry text COLLATE pg_catalog."default",
    gic_sub_industry text COLLATE pg_catalog."default",
    ipo_date date,
    shares_outstanding bigint,
    shares_float bigint,
    percent_insiders double precision,
    percent_institutions double precision,
    CONSTRAINT stocks_pkey PRIMARY KEY (symbol),
    CONSTRAINT stocks_fk FOREIGN KEY (symbol)
        REFERENCES instruments (symbol) MATCH SIMPLE
        NOT VALID


For every symbol/code generated in the instruments table, we will parallelize calls to the EOD HD API to fetch fundamental details, including identification, sector, industry, IPO date, shares outstanding, logo, and more.

We can see the expected information using the call: https://eodhd.com/api/fundamentals/<symbol>?filter=General&api_token=<your-token>&fmt=json. Example:

    "General": {
        "Code": "AAPL",
        "Type": "Common Stock",
        "Name": "Apple Inc",
        "Exchange": "NASDAQ",
        "CurrencyCode": "USD",
        "CurrencyName": "US Dollar",
        "CurrencySymbol": "$",
        "CountryName": "USA",
        "CountryISO": "US",
        "OpenFigi": "BBG000B9XRY4",
        "ISIN": "US0378331005",
        "LEI": "HWUPKR0MPOU8FGXBT394",
        "PrimaryTicker": "AAPL.US",
        "CUSIP": "037833100",
        "CIK": "320193",
        "EmployerIdNumber": "94-2404110",
        "FiscalYearEnd": "September",
        "IPODate": "1980-12-12",
        "InternationalDomestic": "International/Domestic",
        "Sector": "Technology",
        "Industry": "Consumer Electronics",
        "GicSector": "Information Technology",
        "GicGroup": "Technology Hardware & Equipment",
        "GicIndustry": "Technology Hardware, Storage & Peripherals",
        "GicSubIndustry": "Technology Hardware, Storage & Peripherals",
        "HomeCategory": "Domestic",
        "IsDelisted": false,
        "Description": "Apple Inc. designs, manufactures, ....",
        "Address": "One Apple Park Way, Cupertino, CA, United States, 95014",
        "Phone": "408 996 1010",
        "WebURL": "https://www.apple.com",
        "LogoURL": "/img/logos/US/aapl.png",
        "FullTimeEmployees": 161000,
        "UpdatedAt": "2023-12-05"
    "SharesStats": {    
        "SharesOutstanding": 15552799744,
        "SharesFloat": 15535488445,
        "PercentInsiders": 0.07200000000000001,
        "PercentInstitutions": 61.317,

Due to space constraints, some details have been omitted from the provided JSON snippet for brevity.

In the following code, we define the extraction process with a specific schema tailored to retrieve only the relevant information needed, bypassing the need to fetch all available data:

def extract():       
    schema = StructType([  
        StructField('General', StructType([  
            StructField('Code', StringType(), True),  
            StructField('Exchange', StringType(), True),  
            StructField('IPODate', StringType(), True),  
            StructField('CountryISO', StringType(), True),  
            StructField('Type', StringType(), True),  
            StructField('Sector', StringType(), True),  
            StructField('Industry', StringType(), True),  
            StructField('GicSector', StringType(), True),  
            StructField('GicGroup', StringType(), True),  
            StructField('GicIndustry', StringType(), True),  
            StructField('GicSubIndustry', StringType(), True),  
            StructField('Description', StringType(), True),  
            StructField('Address', StringType(), True),  
            StructField('WebURL', StringType(), True),  
            StructField('LogoURL', StringType(), True),  
            StructField('FullTimeEmployees', IntegerType(), True)])),  
        StructField('SharesStats', StructType([  
            StructField('SharesOutstanding', IntegerType(), True),  
            StructField('SharesFloat', IntegerType(), True),  
            StructField('PercentInsiders', DoubleType(), True),  
            StructField('PercentInstitutions', DoubleType(), True),  

    # Get data to extract  
    conn = get_postgres_connection()  
    cursor = conn.cursor()  
        f"SELECT i.symbol FROM instruments i WHERE i.type='STOCK' AND i.symbol NOT IN (SELECT symbol FROM stocks)")  
    symbols = [r[0] for r in cursor.fetchall()]  
    urls = []  
    for symbol in symbols:  
    api_df = spark.createDataFrame(urls, SCHEMA_URL)  
    udf_api_call = udf(rest_api_get, schema)  
    df_data = api_df.withColumn('data', udf_api_call(col('url')))  # create a column data with response of rest api  
    df_data = df_data.select('data.*', '*').drop('url', 'col', 'data')  
    df_data = df_data.select('*', 'General.*', 'SharesStats.*').drop('General', 'SharesStats')  
    return df_data
  • Initially, we establish a connection to the PostgreSQL database to retrieve symbols from the instruments table that have not yet been inserted into the stocks table. (SELECT i.symbol FROM instruments i WHERE i.type='STOCK' AND i.symbol NOT IN (SELECT symbol FROM stocks))
  • We create a list of URLS to be called for each of the fetched symbols into a Spark DataFrame (api_df) with the specified schema (SCHEMA_URL). This list allows us to create multiple parallel connection to the API.
  • udf_api_call = udf(_rest_api_get, schema): Here, we establish a user-defined function (UDF) named _rest_api_get to execute a REST API call (rest_api_get) using the provided schema (schema).
  • In IPODate we can't use the type pyspark.sql.types.DateType to represent the Date on a DataFrame, we'll convert to date in the transform step. `
  • df_data = api_df.withColumn('data', udf_api_call(col('url'))): We apply the UDF to the url column in api_df to fetch data from the API and add a new column data (will the fetched information)
  • With the next line df_data.select('*', 'General.*', 'SharesStats.*').drop('General', 'SharesStats')we unpack the datain the two nested fields Generaland SharesStats:
  • Finally we need to explode again using df_data.select('*', 'General.*', 'SharesStats.*')


At the transform we prepare the Dataframe with the information to be loaded in the database schema.


def transform(df_data):  
    stocks_dict = {  
        "code": "symbol",  
        "exchange": "exchange_symbol",  
        "full_time_employees": "employees",  
    df = df_data.toDF(*[snake_case(c) for c in df_data.columns])  
    df = rename_cols(df, stocks_dict)  
    df = df.filter(df.type == 'Common Stock')  
    df = df.withColumn('ipo_date', to_date(df['ipo_date'], 'yyyy-MM-dd').cast(DateType()))  
    df = df.withColumn('logo_url', concat(lit('https://eodhistoricaldata.com'), col('logo_url')))  
    df = df.withColumn('exchange_symbol', when(col('exchange_symbol').isin(EOD_HD_API_US_SPECIFIC_EXCHANGES), 
    df = df.withColumn('symbol', concat(col('symbol'), lit('.'), col('exchange_symbol')))  
    df = df.drop('exchange_symbol', 'type')  
    df = df.na.fill(value=0.0, subset=['percent_insiders', 'percent_institutions'])  
    return df

An important observation to highlight is that in the preceding article, when we fetched the symbols for the SP500, the API provided the exchange US for each symbol:

"General": {
    "Code": "GSPC",
    "Type": "INDEX",
    "Name": "S&P 500 Index",
"Components": {
    "0": {
        "Code": "AIZ",
        "Exchange": "US",
        "Name": "Assurant Inc",
        "Sector": "Financial Services",
        "Industry": "Insurance - Specialty"

This occurs because EOD HD consolidates the markets of NYSE, NASDAQ, BATS, and AMEX under the exchange US in the index components API. If we request the fundamentals for one of these symbols, such as AIZ.US(https://eodhd.com/api/fundamentals/AIZ.US?filter=General,SharesStats&api_token=<your-token>):

    "General": {
        "Code": "AIZ",
        "Type": "Common Stock",
        "Name": "Assurant Inc",
        "Exchange": "NYSE",

We are retrieving NYSE instead of US. To ensure a consistent relationship between the symbols in the stocks table and the instruments table, we substitute all occurrences of exchanges listed as NYSE, NASDAQ, BATS, and AMEX with the exchange US:

df = df.withColumn('exchange_symbol', when(col('exchange_symbol').isin(EOD_HD_API_US_SPECIFIC_EXCHANGES),                          EOD_HD_API_US_EXCHANGE_GROUP).otherwise(col('exchange_symbol')))


In the load step, we append the information into the table stocks:

def load(df):  
    if df.count() > 0:  
        # mode append to truncate all and recreate  
        df.write.jdbc(config.get_jdbc_url(), 'stocks', mode='append', properties=JDBC_PROPERTIES)

