avatarXavier Escudero

Summary

This article describes the process of creating a data pipeline to extract historical daily data from the EOD HD API and store it using TimescaleDB.

Abstract

The article begins by introducing TimescaleDB, an extension to PostgreSQL that allows for the use of regular PostgreSQL tables alongside specialist time-series tables. The author then provides instructions for installing TimescaleDB using Docker and creating a table and hypertable in the database. The article then moves on to the extraction phase of the data pipeline, which involves selecting the last loaded date per symbol and extracting data from the EOD HD API. The author provides code examples for each step of the process, including creating a view in Postgres to retrieve the last loaded date per symbol and using PySpark to extract data from the API. The article concludes with a brief discussion of the transformation and load phases of the data pipeline.

Bullet points

  • TimescaleDB is an extension to PostgreSQL that allows for the use of regular PostgreSQL tables alongside specialist time-series tables.
  • The author provides instructions for installing TimescaleDB using Docker and creating a table and hypertable in the database.
  • The extraction phase of the data pipeline involves selecting the last loaded date per symbol and extracting data from the EOD HD API.
  • The author provides code examples for each step of the process, including creating a view in Postgres to retrieve the last loaded date per symbol and using PySpark to extract data from the API.
  • The article concludes with a brief discussion of the transformation and load phases of the data pipeline.

Building the Ultimate Trading Data Pipeline — Part 3: Data pipeline for populating daily historical data

In this article we’re going to create a pipeline to extract historial daily data from EOD HD API and store it using Timescale.

TimescaleDB is an open-source database designed to make SQL scalable for time-series data. It is engineered up from PostgreSQL and packaged as a PostgreSQL extension, providing automatic partitioning across time and space (partitioning key), as well as full SQL support.

TimescaleDB is an extension to PostgreSQL and one of the biggest benefits is that we can use regular PostgreSQL tables (we can create all tables we did in the previous articles) alongside the specialist time-series tables.

Install TimeScaleDB

Prerequisites: Docker installed.

Use this docker-compose.yaml:

version: '3.1'
services:  
  timescaledb:  
    image: timescale/timescaledb-ha:pg14-latest  
    environment:  
      - POSTGRES_DB=trading-data-db  
      - POSTGRES_USER=<user>  
      - POSTGRES_PASSWORD=<password>
    ports:  
      - "5432:5432"  
    volumes:  
      - timescaledb-ha:/home/postgres/pgdata/data  
volumes:  
    timescaledb-ha:  
      driver: local # Define the driver and options under the volume name  
      driver_opts:  
        type: none  
        device: /data/db_data  
        o: bind

And start with: docker-compose up -d.

Create table and hypertable

Connect to the database using Pgadmin or any alternative tool such as DBeaver and execute:

CREATE TABLE IF NOT EXISTS daily_bars
(
    symbol character varying(12),
    date date NOT NULL,
    open double precision,
    high double precision,
    low double precision,
    close double precision,
    volume bigint,
    u_close double precision,    
)

SELECT create_hypertable('daily_bars', 'date')
CREATE index on daily_bars(symbol, date desc)

As we need to filter by date and by symbol, we need to create the index.

Extract

Selecting the last loaded date per symbol

We aim to incorporate new daily bars as they become available. Initially, we retrieve for each ticker/symbol the entire historical daily bars dataset spanning, for example, the past 50 years. Subsequently, at the conclusion of each trading session, we’ll append only the new daily bar data for the current day. Therefore, it is necessary to retrieve the most recent loaded date for each symbol.

We may try this code with Pyspark:

query = """
            SELECT s.symbol, (SELECT DATE(MAX(date)) FROM daily_bars db WHERE db.symbol = s.symbol) AS last_date
            FROM stocks s
            """
symbols_last_date_df = spark.read.jdbc(get_jdbc_url(), query, properties={
        "user": DB_USER,
        "password": DB_PASSWORD,
        "driver": "org.postgresql.Driver"
    })

However, when utilizing Pyspark’s read.jdbc, issues arise. The challenge lies in its limited compatibility with subselects. To address this, we create a view in Postgres:

CREATE VIEW symbols_last_date_view AS  
SELECT s.symbol, MAX(db.date) AS last_date  
FROM stocks s  
LEFT JOIN daily_bars db ON s.symbol = db.symbol  
GROUP BY s.symbol;

We can obtain now a DataFrame with each symbol and its last loaded date using the following code:

def fetch_symbols_last_date():  
    query = "(SELECT * FROM symbols_last_date_view) as subquery"  
    return spark.read.jdbc(url=config.get_jdbc_url(), table=query, properties=JDBC_PROPERTIES)

Extracting from EOD HD API

We’ll use the End-Of-Day Historical Stock Market Data API. For example, to obtain the daily data for symbol AAPL.US from date 2023-12-11 we may open the url: https://eodhd.com/api/eod/AAPL.US?from=2017-01-05&to=2023-12-11&period=d&api_token=token&fmt=json

EOD Historical Data Example

We observe that the symbol is absent from the returned JSON. To address this, and considering that we intend to merge all parallelized data, we need to encapsulate the actual API call to include the symbol we utilized.

You may realize that when the fractional part of open, high, low or close is zero, the EOD HD API returns only the integer part (without .0). That's a problem for the schema definition as PySpark is going to set a 0 erroneously, instead the real number.

Due to these reasons, we encapsulate the call to ensure the accurate retrieval of data:

def float_with_decimals(value):  
    if isinstance(value, int):  
        return round(float(value), 4)  
    elif isinstance(value, float):  
        return round(value, 4)  
    else:  
        return 0.0
    
def _rest_api_get(symbol, url):  
    json_array = rest_api_get(url)  
    if json_array is not None:  
        return map(lambda x: dict(x, **{'symbol': symbol,  
                                        'date': datetime.strptime(x['date'], '%Y-%m-%d').date(),  
                                        'open': float_with_decimals(x['open']),  
                                        'high': float_with_decimals(x['high']),  
                                        'low': float_with_decimals(x['low']),  
                                        'close': float_with_decimals(x['close']),  
                                        'adjusted_close': float_with_decimals(x['adjusted_close'])  
                                        }), rest_api_get(url))  
    else:  
        return [{'symbol': symbol, 'date': None, 'open': 0.0, 'high': 0.0, 'low': 0.0, 'close': 0.0, 'volume': 0}]  

Additionally, we convert the date string to a date format.

Get next business day from last date

As we need to obtain the pending data since the last date, we need to start from the next business day of last date. We create the next supporting functions:

import holidays

def next_business_day(previous_date):  
    next_day = previous_date + timedelta(days=1)  
    while next_day.weekday() in holidays.WEEKEND:  
        next_day += timedelta(days=1)  
    return next_day
def get_next_date(from_date):  
    if from_date is None:  
        return date.today() - relativedelta(years=50)  
    else:  
        return next_business_day(from_date)

In the case where the last date is NULL, it indicates the initial load of daily historical data. In such instances, we need to retrieve the date relative to today from a few years ago (e.g., 50 years in the above example).

Following the same procedure employed with Pyspark in earlier articles, we’ll begin by defining the schema:

schema = T.ArrayType(T.StructType([  
    T.StructField("symbol", T.StringType(), False),  
    T.StructField("date", T.DateType(), False),  
    T.StructField("open", T.DoubleType(), False),  
    T.StructField("high", T.DoubleType(), False),  
    T.StructField("low", T.DoubleType(), False),  
    T.StructField("close", T.DoubleType(), False),  
    T.StructField("adjusted_close", T.DoubleType(), False),  
    T.StructField("volume", T.IntegerType(), False)  
]))

And then we iterate the symbols and last date to get the final Dataframe:

urls = []

df = fetch_symbols_last_date()  
symbols_with_date = df.collect() 
 
for row in symbols_with_date:  
    start_date = get_next_date(row['last_date'])  
    if start_date > date.today():  
        continue  
    symbol = row['symbol']  
    url = f"https://eodhd.com/api/eod/{symbol}?from={start_date.strftime('%Y-%m-%d')}&period=d&api_token={config.EOD_API_TOKEN}&fmt=json"  
    urls.append(Row(symbol, url))  

schema_api = StructType([StructField('symbol', StringType()), StructField('url', StringType())])  
api_df = spark.createDataFrame(urls, schema_api)  
api_df.show()  
udf_api_call = udf(_rest_api_get, schema)  
df_data = api_df.withColumn('data', udf_api_call(col('symbol'), col('url')))  
df_data = df_data.select(df_data.url, explode_outer(df_data.data)).select("col.*", "*").drop('url', 'col')  
df_data.show()  
return df_data
  • First we retrieve the Dataframe that contains information about symbols and their last loaded dates (fetch_symbols_last_date()).
  • We collect the DataFrame into a list of rows. This allows us to iterate over the data.
  • For each row we calculate the next date after the last loaded date (next date). We skip the current iteration if the calculated start date is in the future.
  • We construct the url for the EOD HD API using the symbol and start date.
  • We define the schema for the DataFrame (api_df) that will store symbols and their corresponding URLs (schema_api) and we create the DataFrame (api_df) from the urls list using this schema.
  • Then we define the User-Defined Function (UDF) using the _rest_api_get function (udf(_rest_api_get, schema)) and we apply this UDF to the api_dfto fetch the daily historical data for each symbol and URL combination, creating a new column named data'.
  • Finally we explode the data column and remove the urlcolumn and the generated colcolumn in the exploding.
Final exploded data to transform and load

Transform

def transform(df):  
    df = df.filter(df.symbol.isNotNull() & df.date.isNotNull() & df.close.isNotNull())
    df = df.withColumnRenamed('close', 'u_close').withColumnRenamed('adjusted_close', 'close')  
    df = df.withColumn('open', expr('open * close / u_close')). \  
        withColumn('high', expr('high * close / u_close')). \  
        withColumn('low', expr('low * close / u_close'))  

    df = df.withColumn('open', round(df.open, 4)).withColumn('high', round(df.high, 4)). \  
        withColumn('low', round(df.low, 4)). \  
        withColumn('close', round(df.close, 4))  
    return df

In the transform function, we perform the following operations:

  1. Remove all rows containing null information, as certain symbols may lack data.
  2. Rename the close column to u_close (unadjusted) and set adjusted_close using data from the EOD HD API as the new close.
  3. Adjust prices (open, high, and low).
  4. Round all values to 4 decimal places.

Load

def load(df):  
    df.write.option("isolationLevel", "NONE").\
        option("batchsize", 10000).jdbc(url=jdbc_url, table='daily_bars', mode='append', properties=JDBC_PROPERTIES)

We’re optimizing the insertions setting the isolation level to NONEand writing in batches of 10000records.

Executing the entire process is highly likely to result in the PostgreSQL server depleting its allocated shared memory resources, especially when we are loading more than half a million rows into the database for the first time.

Writing JDBC — Out of shared memory

Increase max_locks_per_transaction

Run the following command in the terminal of the Docker container host:

cd /data/db_data
nano postgresql.conf

Use nanoor your preferred editor. Change max_locks_per_transactionfrom 64 to 128 and restart docker-compose:

docker-compose restart

Check from PgAdmin that the value has changed:

SHOW max_locks_per_transaction;

Final code

Here is the completed code:

👏 Did you like the story? Give 1 to 50 claps to show your support! Your claps really helps me out and motivates me to keep creating valuable content. Thank you for your support! 👏

Thank you for being part of our community! Before you go:

  • If you liked the story feel free to clap 👏 and follow the author.
  • Learn How To Develop Your Trading Bots 👉 here.
  • Join our Premium Discord Server👉 here.

*Note that this article does not provide personal investment advice and I am not a qualified licensed investment advisor. All information found here is for entertainment or educational purposes only and should not be construed as personal investment advice.

Python
Trading
Eod Stock Market Api
Pyspark
Recommended from ReadMedium