3 Ways for Multiple Time Series Forecasting Using Prophet in Python
Train and predict multiple time series using for-loop, multi-processing, and PySpark
Multiple time series forecasting refers to training many time series models and making predictions. For example, if we would like to predict the sales quantity of 10 products in 5 stores, there will be 50 store-product combinations, and each combination is a time series. Using the multiple time series model, we can train and predict the 50 time series model at the same time. Another example is to predict multiple stock prices at the same time.
In this tutorial, we will predict the stock prices of five tech companies using Prophet. Three ways of running multiple time series forecasting will be demonstrated. You will learn:
- How to run multiple time series forecasting using for loop?
- How to set up multi-processing and utilize all the cores on a computer to run multiple time series models?
- How to set up PySpark to run multiple time series forecasting in parallel?
If you are not familiar with Prophet, please check out my previous tutorial Time Series Forecasting Of Bitcoin Prices Using Prophet and Multivariate Time Series Forecasting with Seasonality and Holiday Effect Using Prophet in Python.
Resources for this post:
- Video tutorial for this post on YouTube
- Python code is at the end of the post. Click here for the Colab notebook
- More video tutorials on time series
- More blog posts on time series
Let’s get started!
Step 1: Install and Import Libraries
In the first step, we will install and import libraries.
Three packages are installed:
yfinance
is the python package for pulling stock data from Yahoo Finance.prophet
is the package for the time series model.pyspark
is for setting up the Spark environment.
# Install libraries
!pip install yfinance prophet pyspark
After installing the three python packages, we imported the libraries needed for this tutorial.
pandas
andnumpy
are for data processing.yfinance
is for pulling stock price data from Yahoo Finance.prophet
is for building the time series model.seaborn
andmatplotlib
are for visualization.Pool
andcpu_count
are for multi-processing.pyspark.sql.types
,pandas_udf
, andPandasUDFType
are for Spark parallel processing.tqdm
is for generating a process bar to show the completed percentage.time
is for tracking the time used for modeling and prediction.
# Data processing
import pandas as pd
import numpy as np
# Get time series data
import yfinance as yf
# Prophet model for time series forecast
from prophet import Prophet
# Visualization
import seaborn as sns
import matplotlib.pyplot as plt
# Multi-processing
from multiprocessing import Pool, cpu_count
# Spark
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Process bar
from tqdm import tqdm
# Tracking time
from time import time
Step 2: Pull Data
The second step pulls stock data from Yahoo Finance API. We will pull 2 years of daily data from the beginning of 2020 to the end of 2021.
start_date = '2020-01-02'
means the earliest date for the stock data is January 2nd of 2020. It did not start with January 1st because January 1st is a holiday, and there is no stock data on holidays and weekends.end_date = '2022-01-01'
means that the last date for the stock data is December 31st of 2021.yfinance
excludes the end date, so we need to add one day to the last day of the data end date.
# Data start date
start_date = '2020-01-02'
# Data end date
end_date = '2022-01-01' # yfinance excludes the end date, so we need to add one day to the last day of data
We will download the closing prices for five tickers. FB is for Facebook (Meta), GOOG is for Google, ORCL is for Oracle, MSFT is for Microsoft, and AMZN is for Amazon.
The goal of the time series model is to predict the closing price of all five stocks.
# Download data
ticker_list = ['FB', 'GOOG', 'ORCL', 'MSFT', 'AMZN']
data = yf.download(ticker_list, start=start_date, end=end_date)[['Close']]
# Drop the top level column name
data.columns = data.columns.droplevel()
# Take a look at the data
data.head()

From the visualization of the stock prices, we can see that all five stocks increase in prices, and Amazon and Google have the highest prices.
# Visualize data using seaborn
sns.set(rc={'figure.figsize':(12,8)})
sns.lineplot(x=data.index, y=data['FB'])
sns.lineplot(x=data.index, y=data['AMZN'])
sns.lineplot(x=data.index, y=data['GOOG'])
sns.lineplot(x=data.index, y=data['MSFT'])
sns.lineplot(x=data.index, y=data['ORCL'])
plt.legend(labels = ['Facebook', 'Amazon', 'Google', 'Microsoft', 'Oracle'])

There are 505 data points for each ticker, and there are no missing values.
# Data information
data.info()
Output
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 505 entries, 2020-01-02 to 2021-12-31
Data columns (total 5 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 AMZN 505 non-null float64
1 FB 505 non-null float64
2 GOOG 505 non-null float64
3 MSFT 505 non-null float64
4 ORCL 505 non-null float64
dtypes: float64(5)
memory usage: 23.7 KB
Step 3: Data Processing
Step 3 transforms the dataset into a multiple time series model dataset.
Firstly, the dataset is transformed from the wide format to the long format using the pandas .melt
function.
Prophet requires at least two columns as inputs: a ds
column and a y
column.
- The
ds
column has the time information. The columnDate
is renamed tods
. - The
y
column has the time series values. In this example, because we are predicting the closing stock price, y represents the stock close price. - There is no pre-defined name for the individual time series in prophet, so we can keep the name ticker as is.
# Release Date from the index
data = data.reset_index()
# Change data from the wide format to the long format
df = pd.melt(data, id_vars='Date', value_vars=['AMZN', 'FB', 'GOOG', 'MSFT', 'ORCL'])
df.columns = ['ds', 'ticker', 'y']
df.head()

After transforming the dataset from the wide format to the long format, we have 2,525 records.
# Check the dataset information
df.info()
Output
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2525 entries, 0 to 2524
Data columns (total 3 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 ds 2525 non-null datetime64[ns]
1 ticker 2525 non-null object
2 y 2525 non-null float64
dtypes: datetime64[ns](1), float64(1), object(1)
memory usage: 59.3+ KB
Next, we group the pandas dataframe by the column ticker
and save it in a new dataframe called groups_by_ticker
. Using .groups.keys()
, we can confirm that there are five groups, one group for each ticker.
# Group the data by ticker
groups_by_ticker = df.groupby('ticker')
# Check the groups in the dataframe
groups_by_ticker.groups.keys()
Output
dict_keys(['AMZN', 'FB', 'GOOG', 'MSFT', 'ORCL'])
Step 4: Define Function
In step 4, the function for training and forecasting each group is defined.
- The input data is an individual time series data for a group.
Prophet()
initiates the time series model with the default hyperparameters, and we give the model the namem
. I will create another tutorial for prophet time series model hyperparameter tuning. Please subscribe to the YouTube channel or Medium email so you will be notified when the video is published.m.fit(group)
fits the prophet model on the individual time series data, which is the stock price data for a ticker.make_future_dataframe
creates a new dataframe calledfuture
for the forecasting.periods=15
means that we will forecast for 15 days of data. To use a different frequency, we can specify thefreq
option. For example,periods=15, freq='MS'
means that we are forecasting for the next 15 months.- After predicting on the
future
dataframe, prophet produces a long list of outputs. We only keptds
,yhat
,yhat_lower
andyhat_upper
.yhat
is the predicted value.yhat_lower
andyhat_upper
are the lower and upper bound of the uncertainty interval. - A new column called
ticker
is created in the forecast dataframe to indicate the ticker name for the predictions.
The output of the function has 5 columns: ds
, ticker
, yhat
, yhat_upper
, and yhat_lower
.
def train_and_forecast(group):
# Initiate the model
m = Prophet()
# Fit the model
m.fit(group)
# Make predictions
future = m.make_future_dataframe(periods=15)
forecast = m.predict(future)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
forecast['ticker'] = group['ticker'].iloc[0]
# Return the forecasted results
return forecast[['ds', 'ticker', 'yhat', 'yhat_upper', 'yhat_lower']]
Step 5: Multiple Time Series Forecast Using For-Loop
In step 5, we will make multiple time series forecasting using for-loop.
- The time used for the forecast is calculated by recording the time in the beginning and at the end, then getting the difference between the two.
- An empty dataframe is created to save the forecast results.
- For each ticker, we firstly get the time series data, then apply the function for training and forecasting to each individual time series, and finally concatenate the forecast results together.
# Start time
start_time = time()
# Create an empty dataframe
for_loop_forecast = pd.DataFrame()
# Loop through each ticker
for ticker in ticker_list:
# Get the data for the ticker
group = groups_by_ticker.get_group(ticker)
# Make forecast
forecast = train_and_forecast(group)
# Add the forecast results to the dataframe
for_loop_forecast = pd.concat((for_loop_forecast, forecast))
print('The time used for the for-loop forecast is ', time()-start_time)
# Take a look at the data
for_loop_forecast.head()
It took 28 seconds to run the five time series models using a for-loop, and the output has five columns.
The time used for the for-loop forecast is 27.879573822021484

Step 6: Multiple Time Series Forecast Using Multi-Processing
In step 6, we will use the Python multiprocessing
package to run the time series forecasts in parallel.
- Firstly, the time series data for each ticker is saved in a list.
- Secondly, a
pool
process with the number of workers being the number of CPUs. Thepool
object from themultiprocessing
Python package executes a function across input data in parallel.cpu_count()
returns the number of CPUs in the system. imap
is the parallel version ofmap
. It returns an object and the results need to be converted to a list. We useimap
to apply thetrain_and_forcast
function to each element in the list called series, where each element is a dataframe for an individual ticker.tqdm
shows the progress bar of the training.- The prediction output is a list of forecast results, one dataframe for each ticker.
- The pool process needs to be manually terminated using .close(). Failure to do this can lead to the process hanging on finalization.
- .join() tells the pool to wait till all the jobs are finished before exiting.
- Finally, the results for all the tickers are concatenated into one single dataframe.
# Start time
start_time = time()
# Get time series data for each ticker and save in a list
series = [groups_by_ticker.get_group(ticker) for ticker in ticker_list]
# Create a pool process with the number of worker processes being the number of CPUs
p = Pool(cpu_count())
# Make predictions for each ticker and save the results to a list
predictions = list(tqdm(p.imap(train_and_forecast, series), total=len(series)))
# Terminate the pool process
p.close()
# Tell the pool to wait till all the jobs are finished before exit
p.join()
# Concatenate results
multiprocess_forecast = pd.concat(predictions)
# Get the time used for the forecast
print('\nThe time used for the multi-processing forecast is ', time()-start_time)
This process takes around 10 seconds.
0%| | 0/5 [00:00<?, ?it/s]INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
20%|██ | 1/5 [00:03<00:15, 3.80s/it]INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
100%|██████████| 5/5 [00:09<00:00, 1.91s/it]
The time used for the multi-processing forecast is 9.68030071258545
Step 7: Multiple Time Series Forecast Using Spark
In step 7, we will use Spark to forecast multiple time series in parallel. The workers in a Spark cluster can train and forecast a subset of models in parallel.
Firstly, a Spark session called spark
is created. We can type the Spark session name to check the information such as the Spark version. Version 3.2.1 is used in this example.
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

Next, the pandas dataframe is converted to a Spark dataframe and grouped by ticker.
spark.createDataFrame
takes a pandas dataframe and converts it into a Spark dataframe.applyInPandas
maps each group using a pandas UDF (User Defined Function) and returns a dataframe.schema
is a StructType describing the schema of the returned dataframe.
# Convert the pandas dataframe into a spark dataframe
sdf = spark.createDataFrame(df)
# Define the restult schema result_schema =StructType([ StructField('ds',DateType()), StructField('ticker',StringType()), StructField('yhat',FloatType()), StructField('yhat_upper',FloatType()), StructField('yhat_lower',FloatType()) ])
# Start time
start_time = time()
# Train and forecast by ticker
spark_forecast = sdf.groupBy('ticker').applyInPandas(train_and_forecast, schema=result_schema)
# Take a look at the results
spark_forecast.show(5)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)
Spark used 12 seconds for the forecast.
+----------+------+---------+----------+----------+
| ds|ticker| yhat|yhat_upper|yhat_lower|
+----------+------+---------+----------+----------+
|2020-01-02| AMZN|1874.2633| 2038.9186| 1731.7717|
|2020-01-03| AMZN|1867.2312| 2000.8362| 1707.6724|
|2020-01-06| AMZN|1868.9219| 2024.8553| 1721.1112|
|2020-01-07| AMZN|1881.8456| 2028.5168| 1736.3002|
|2020-01-08| AMZN|1886.3406| 2047.3295| 1743.9379|
+----------+------+---------+----------+----------+
only showing top 5 rows
The time used for the Spark forecast is 11.66998553276062
Another way of doing multiple time series forecasting is to use pandas_udf
as a decorator, and apply the function to the grouped Spark dataframe. However, It is preferred to use applyInPandas
over this API because it will be deprecated in future releases.
Nevertheless, I provide the code for using the pandas_udf
decorator for your reference.
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def train_and_forecast(group):
# Initiate the model
m = Prophet()
# Fit the model
m.fit(group)
# Make predictions
future = m.make_future_dataframe(periods=15)
forecast = m.predict(future)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
forecast['ticker'] = group['ticker'].iloc[0]
# Return the forecasted results
return forecast[['ds', 'ticker', 'yhat', 'yhat_upper', 'yhat_lower']]
# Start time
start_time = time()
# Train and forecast by ticker
spark_forecast = sdf.groupBy('ticker').apply(train_and_forecast)
# Take a look at the results
spark_forecast.show(5)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)
Output
/usr/local/lib/python3.7/dist-packages/pyspark/sql/pandas/group_ops.py:84: UserWarning: It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
"more details.", UserWarning)
+----------+------+---------+----------+----------+
| ds|ticker| yhat|yhat_upper|yhat_lower|
+----------+------+---------+----------+----------+
|2020-01-02| AMZN|1874.2633| 2038.9186| 1731.7717|
|2020-01-03| AMZN|1867.2312| 2000.8362| 1707.6724|
|2020-01-06| AMZN|1868.9219| 2024.8553| 1721.1112|
|2020-01-07| AMZN|1881.8456| 2028.5168| 1736.3002|
|2020-01-08| AMZN|1886.3406| 2047.3295| 1743.9379|
+----------+------+---------+----------+----------+
only showing top 5 rows
The time used for the Spark forecast is 11.952160358428955
Step 8: Which Method to use?
Now you know how to make multiple time series forecasting using a for-loop, using multi-processing, and using Spark, which method should you use for your project?
The general guideline is:
- When the number of models is small, there is not a big difference in processing time, so any one of the three methods is good to use.
- When the number of models is medium, use multi-processing or Spark to utilize multiple CPUs in parallel.
- When the number of models is large, Spark is preferred.
More tutorials are available on GrabNGoInfo YouTube Channel and GrabNGoInfo.com.