Building the Ultimate Trading Data Pipeline — Part 3: Data pipeline for populating daily historical data
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*tB8AHfkZ91NiNk14EtNNsQ.jpeg)
In this article we’re going to create a pipeline to extract historial daily data from EOD HD API and store it using Timescale.
TimescaleDB is an open-source database designed to make SQL scalable for time-series data. It is engineered up from PostgreSQL and packaged as a PostgreSQL extension, providing automatic partitioning across time and space (partitioning key), as well as full SQL support.
TimescaleDB is an extension to PostgreSQL and one of the biggest benefits is that we can use regular PostgreSQL tables (we can create all tables we did in the previous articles) alongside the specialist time-series tables.
Install TimeScaleDB
Prerequisites: Docker installed.
Use this docker-compose.yaml
:
version: '3.1'
services:
timescaledb:
image: timescale/timescaledb-ha:pg14-latest
environment:
- POSTGRES_DB=trading-data-db
- POSTGRES_USER=<user>
- POSTGRES_PASSWORD=<password>
ports:
- "5432:5432"
volumes:
- timescaledb-ha:/home/postgres/pgdata/data
volumes:
timescaledb-ha:
driver: local # Define the driver and options under the volume name
driver_opts:
type: none
device: /data/db_data
o: bind
And start with: docker-compose up -d
.
Create table and hypertable
Connect to the database using Pgadmin or any alternative tool such as DBeaver and execute:
CREATE TABLE IF NOT EXISTS daily_bars
(
symbol character varying(12),
date date NOT NULL,
open double precision,
high double precision,
low double precision,
close double precision,
volume bigint,
u_close double precision,
)
SELECT create_hypertable('daily_bars', 'date')
CREATE index on daily_bars(symbol, date desc)
As we need to filter by date and by symbol
, we need to create the index.
Extract
Selecting the last loaded date per symbol
We aim to incorporate new daily bars as they become available. Initially, we retrieve for each ticker/symbol the entire historical daily bars dataset spanning, for example, the past 50 years. Subsequently, at the conclusion of each trading session, we’ll append only the new daily bar data for the current day. Therefore, it is necessary to retrieve the most recent loaded date for each symbol.
We may try this code with Pyspark:
query = """
SELECT s.symbol, (SELECT DATE(MAX(date)) FROM daily_bars db WHERE db.symbol = s.symbol) AS last_date
FROM stocks s
"""
symbols_last_date_df = spark.read.jdbc(get_jdbc_url(), query, properties={
"user": DB_USER,
"password": DB_PASSWORD,
"driver": "org.postgresql.Driver"
})
However, when utilizing Pyspark’s read.jdbc
, issues arise. The challenge lies in its limited compatibility with subselects. To address this, we create a view in Postgres:
CREATE VIEW symbols_last_date_view AS
SELECT s.symbol, MAX(db.date) AS last_date
FROM stocks s
LEFT JOIN daily_bars db ON s.symbol = db.symbol
GROUP BY s.symbol;
We can obtain now a DataFrame with each symbol and its last loaded date using the following code:
def fetch_symbols_last_date():
query = "(SELECT * FROM symbols_last_date_view) as subquery"
return spark.read.jdbc(url=config.get_jdbc_url(), table=query, properties=JDBC_PROPERTIES)
Extracting from EOD HD API
We’ll use the End-Of-Day Historical Stock Market Data API. For example, to obtain the daily data for symbol AAPL.US
from date 2023-12-11
we may open the url: https://eodhd.com/api/eod/AAPL.US?from=2017-01-05&to=2023-12-11&period=d&api_token=token&fmt=json
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*Y5PXWXpPDQzojpLYqURgmg.png)
We observe that the symbol is absent from the returned JSON. To address this, and considering that we intend to merge all parallelized data, we need to encapsulate the actual API call to include the symbol we utilized.
You may realize that when the fractional part of open, high, low or close is zero, the EOD HD API returns only the integer part (without .0
). That's a problem for the schema definition as PySpark is going to set a 0
erroneously, instead the real number.
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*Aea--scDaYnfcKTMAIWMFQ.png)
Due to these reasons, we encapsulate the call to ensure the accurate retrieval of data:
def float_with_decimals(value):
if isinstance(value, int):
return round(float(value), 4)
elif isinstance(value, float):
return round(value, 4)
else:
return 0.0
def _rest_api_get(symbol, url):
json_array = rest_api_get(url)
if json_array is not None:
return map(lambda x: dict(x, **{'symbol': symbol,
'date': datetime.strptime(x['date'], '%Y-%m-%d').date(),
'open': float_with_decimals(x['open']),
'high': float_with_decimals(x['high']),
'low': float_with_decimals(x['low']),
'close': float_with_decimals(x['close']),
'adjusted_close': float_with_decimals(x['adjusted_close'])
}), rest_api_get(url))
else:
return [{'symbol': symbol, 'date': None, 'open': 0.0, 'high': 0.0, 'low': 0.0, 'close': 0.0, 'volume': 0}]
Additionally, we convert the date string to a date format.
Get next business day from last date
As we need to obtain the pending data since the last date, we need to start from the next business day of last date. We create the next supporting functions:
import holidays
def next_business_day(previous_date):
next_day = previous_date + timedelta(days=1)
while next_day.weekday() in holidays.WEEKEND:
next_day += timedelta(days=1)
return next_day
def get_next_date(from_date):
if from_date is None:
return date.today() - relativedelta(years=50)
else:
return next_business_day(from_date)
In the case where the last date is NULL
, it indicates the initial load of daily historical data. In such instances, we need to retrieve the date relative to today from a few years ago (e.g., 50
years in the above example).
Following the same procedure employed with Pyspark in earlier articles, we’ll begin by defining the schema:
schema = T.ArrayType(T.StructType([
T.StructField("symbol", T.StringType(), False),
T.StructField("date", T.DateType(), False),
T.StructField("open", T.DoubleType(), False),
T.StructField("high", T.DoubleType(), False),
T.StructField("low", T.DoubleType(), False),
T.StructField("close", T.DoubleType(), False),
T.StructField("adjusted_close", T.DoubleType(), False),
T.StructField("volume", T.IntegerType(), False)
]))
And then we iterate the symbols and last date to get the final Dataframe:
urls = []
df = fetch_symbols_last_date()
symbols_with_date = df.collect()
for row in symbols_with_date:
start_date = get_next_date(row['last_date'])
if start_date > date.today():
continue
symbol = row['symbol']
url = f"https://eodhd.com/api/eod/{symbol}?from={start_date.strftime('%Y-%m-%d')}&period=d&api_token={config.EOD_API_TOKEN}&fmt=json"
urls.append(Row(symbol, url))
schema_api = StructType([StructField('symbol', StringType()), StructField('url', StringType())])
api_df = spark.createDataFrame(urls, schema_api)
api_df.show()
udf_api_call = udf(_rest_api_get, schema)
df_data = api_df.withColumn('data', udf_api_call(col('symbol'), col('url')))
df_data = df_data.select(df_data.url, explode_outer(df_data.data)).select("col.*", "*").drop('url', 'col')
df_data.show()
return df_data
- First we retrieve the Dataframe that contains information about symbols and their last loaded dates (
fetch_symbols_last_date()
). - We collect the DataFrame into a list of rows. This allows us to iterate over the data.
- For each row we calculate the next date after the last loaded date (next date). We skip the current iteration if the calculated start date is in the future.
- We construct the url for the EOD HD API using the symbol and start date.
- We define the schema for the DataFrame (
api_df
) that will store symbols and their corresponding URLs (schema_api
) and we create the DataFrame (api_df
) from theurls
list using this schema.
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*y2iIDvRlqm3JuBBVS6Gwyg.png)
- Then we define the User-Defined Function (UDF) using the
_rest_api_get
function (udf(_rest_api_get, schema)
) and we apply this UDF to theapi_df
to fetch the daily historical data for each symbol and URL combination, creating a new column nameddata'
.
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*lTf3h8skq3Gst_6jTK7avg.png)
- Finally we explode the data column and remove the
url
column and the generatedcol
column in the exploding.
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*oODhdSf6pJ6vcjWnDhhu7Q.png)
Transform
def transform(df):
df = df.filter(df.symbol.isNotNull() & df.date.isNotNull() & df.close.isNotNull())
df = df.withColumnRenamed('close', 'u_close').withColumnRenamed('adjusted_close', 'close')
df = df.withColumn('open', expr('open * close / u_close')). \
withColumn('high', expr('high * close / u_close')). \
withColumn('low', expr('low * close / u_close'))
df = df.withColumn('open', round(df.open, 4)).withColumn('high', round(df.high, 4)). \
withColumn('low', round(df.low, 4)). \
withColumn('close', round(df.close, 4))
return df
In the transform
function, we perform the following operations:
- Remove all rows containing null information, as certain symbols may lack data.
- Rename the
close
column tou_close
(unadjusted) and setadjusted_close
using data from the EOD HD API as the newclose
. - Adjust prices (
open
,high
, andlow
). - Round all values to
4
decimal places.
Load
def load(df):
df.write.option("isolationLevel", "NONE").\
option("batchsize", 10000).jdbc(url=jdbc_url, table='daily_bars', mode='append', properties=JDBC_PROPERTIES)
We’re optimizing the insertions setting the isolation level to NONE
and writing in batches of 10000
records.
Executing the entire process is highly likely to result in the PostgreSQL server depleting its allocated shared memory resources, especially when we are loading more than half a million rows into the database for the first time.
![](https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*13JKoQ-hCrW5xUjJKrDgPg.png)
Increase max_locks_per_transaction
Run the following command in the terminal of the Docker container host:
cd /data/db_data
nano postgresql.conf
Use nano
or your preferred editor. Change max_locks_per_transaction
from 64
to 128
and restart docker-compose:
docker-compose restart
Check from PgAdmin that the value has changed:
SHOW max_locks_per_transaction;
Final code
Here is the completed code: