A PRACTICAL GUIDE: SPARK 3.2.0
A new Era of SPARK and PANDAS Unification
Pyspark and Pandas
Most of the data scientists or machine learning engineers start with Pandas and Numpy before moving to other libraries. No one can debate around the use of Pandas as a standard data processing library. There are a lot of benefits of using Pandas however one key bottleneck of Pandas API is to adapt with distributed processing. Solutions like Modin and Dask solve this problem to some extent.
When it comes to using distributed processing frameworks, Spark is the de-facto choice for professionals and large data processing hubs. Recently, Databricks’s team open-sourced a library called Koalas to implement the Pandas API with spark backend. This library is under active development and covers more than 80% of Pandas API. To read more about using Koalas, refer to my earlier article Spark-ifying Pandas: Databrick’s Koalas with Google Colab.
With the release of Spark 3.2.0, the KOALAS is integrated in the pyspark submodule named as pyspark.pandas
. The seamless integration of pandas with Spark is one of the key upgrades to Spark. Some of the key features of this release are
- Introducing pandas API on Apache Spark to unify small data API and big data API (learn more here).
- Completing the ANSI SQL compatability mode to simplify migration of SQL workloads.
- Productionizing adaptive query execution to speed up Spark SQL at runtime.
- Introducing RocksDB statestore to make state processing more scalable.
We will focus on PANDAS on SPARK
In this tutorial, I will walk you through to perform exploratory data analysis using pyspark.pandas
and performing some basic pandas functionalities. There are a lot of benefits of using pandas with a spark when dealing with large datasets. Some of the key points are
- Big data processing made easy
- Under the hood heavy lifting
- Integration with PySpark is seamless
- Using the default plotting APIs
- Building ML Models
The objective of this tutorial is to leverage the Spark backend for pandas. The working google collaboratory will be embedded.
Setting up Spark 3.2.0 in the Google Colaboratory
As a first step, I configure the google colab runtime with spark installation. For details, readers may read my article Getting Started Spark 3.0.0 in Google Colab om medium.
We will install the below programs
- Java 8
- spark-3.2.0
- Hadoop3.2
- Findspark
you can install the LATEST version of Spark using the below set of commands.
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark
Use the ls
command to see the downloaded files
!ls
ccpp.csv
Getting_Started_Koalas_2021-10-29_10-34-58_820424.log
Getting_Started_Koalas_2021-10-29_10-39-16_103833.log
sample_data
spark-3.2.0-bin-hadoop3.2
spark-3.2.0-bin-hadoop3.2.tgz
Environment Variable
After installing the spark and Java, set the environment variables where Spark and Java are installed.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
Spark Installation test
Let us test the installation of spark in our google colab environment.
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)
+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows
Install loguru
to log the events
# Install compatible version of pyarrow
!pip install loguru
Collecting loguru
Downloading loguru-0.5.3-py3-none-any.whl (57 kB)
[K |████████████████████████████████| 57 kB 4.8 MB/s
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.5.3
Importing the essential libraries
# Import the libraries
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import seaborn as sns
# Import logger
from loguru import logger
logger.add("Getting_Started_Koalas_{time}.log")
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
Install the Compatible Version of plotly
For plotting, requires plotly
version >=4.8
for details on the requirements, follow below link https://koalas.readthedocs.io/en/latest/getting_started/install.html#dependencies
import plotly
logger.info(f'plotly_version{plotly.__version__}')
2021-10-29 11:29:31.122 | INFO | __main__:<module>:2 - plotly_version5.3.1
! pip uninstall plotly
Found existing installation: plotly 4.4.1
Uninstalling plotly-4.4.1:
Would remove:
/usr/local/etc/jupyter/nbconfig/notebook.d/plotlywidget.json
/usr/local/lib/python3.7/dist-packages/_plotly_future_/*
/usr/local/lib/python3.7/dist-packages/_plotly_utils/*
/usr/local/lib/python3.7/dist-packages/plotly-4.4.1.dist-info/*
/usr/local/lib/python3.7/dist-packages/plotly/*
/usr/local/lib/python3.7/dist-packages/plotlywidget/*
/usr/local/share/jupyter/nbextensions/plotlywidget/extension.js
/usr/local/share/jupyter/nbextensions/plotlywidget/index.js
Proceed (y/n)? y
Successfully uninstalled plotly-4.4.1
!pip install plotly
Collecting plotly
Downloading plotly-5.3.1-py2.py3-none-any.whl (23.9 MB)
[K |████████████████████████████████| 23.9 MB 1.8 MB/s
[?25hCollecting tenacity>=6.2.0
Downloading tenacity-8.0.1-py3-none-any.whl (24 kB)
Requirement already satisfied: six in /usr/local/lib/python3.7/dist-packages (from plotly) (1.15.0)
Installing collected packages: tenacity, plotly
Successfully installed plotly-5.3.1 tenacity-8.0.1
Hands-On: PySpark Pandas in Action
In this section, we get hands-on learning of using pyspark.pandas
to perform basic functionalities of playing with data as we do in pandas.
I will use the Combined Cycle Power Plant data set to predict the net hourly electrical output (EP). I have uploaded the data to my GitHub so that users can reproduce the results.
Basic features
In this section, let us learn basic features of pyspark.pandas
.
psdf = ps.DataFrame(
{'a': [1, 2, 3, 4, 5, 6],
'b': [100, 200, 300, 400, 500, 600],
'c': ["one", "two", "three", "four", "five", "six"]},
index=[10, 20, 30, 40, 50, 60])
psdf
psdf.head()
To/From Pandas
The users can easily convert pandas data frames into pyspark.pandas data frames. Below are the examples
# Converting to/from pandas dataframes
pddf = psdf.to_pandas()
# Let us find out the type of dataframes
type(pddf), type(psdf)
(pandas.core.frame.DataFrame, pyspark.pandas.frame.DataFrame)
psdf_ = ps.from_pandas(pddf)
type(psdf_)
pyspark.pandas.frame.DataFrame
To/From Spark dataframes
Sometimes, we have to convert spark dataframes to pyspark.pandas
dataframes and vice versa. It is pretty straightforward now.
dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf
# Create Spark Dataframe using Pandas dataframe
sdf = spark.createDataFrame(pdf)
type(sdf)
pyspark.sql.dataframe.DataFrame
sdf.show()
+--------------------+--------------------+-------------------+--------------------+
| A| B| C| D|
+--------------------+--------------------+-------------------+--------------------+
|-0.06088523641087523| 0.23338445585443923| 0.5999170439315439|-0.20984570996986368|
| 0.22791307776510422| 0.7763417712895295|0.03037620161121167| 0.7774577062233787|
| 0.29336208280885484|-0.08371353732918399| -0.649808816600661| -0.6308517359448679|
| 0.5478211374585771| 0.1611948786816446|-0.7429478444712448| 0.3734789112693188|
| 0.5225691421190196|-0.33997894700945974|0.16459448070835278| -0.5744045570930776|
| -0.5061501459749616| -1.6594845990596399|0.44475073046054053| 0.8386075096162613|
+--------------------+--------------------+-------------------+--------------------+
psdf = sdf.to_pandas_on_spark()
type(psdf)
pyspark.pandas.frame.DataFrame
Groupby made Simple
Similar to pandas, pyspark.pandas
can use the standard groupby functionalities.
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
'foo', 'bar', 'foo', 'foo'],
'B': ['one', 'one', 'two', 'three',
'two', 'two', 'one', 'three'],
'C': np.random.randn(8),
'D': np.random.randn(8)})
psdf.groupby('A').sum()
psdf.groupby(['A', 'B']).sum()
To practice more, see below link
Exploratory Data Analysis using pyspark.pandas
As a first step, I want to explore the given data, its distribution, and dependency using pyspark.pandas
API. I will include a simple example to demonstrate the idea, users can extend it for the problem in hand.
Download the data and save it locally
# Downloading the clustering dataset
!wget -q 'https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv'
Read the data using pyspak.pandas.read_csv
method. To read more about the API follow the Koalas official documentation
# Read the data
psdf_ccpp = ps.read_csv("ccpp.csv")
Let us check the type of dataframes
# Check the type of psdf
type(psdf_ccpp)
pyspark.pandas.frame.DataFrame
psdf_ccpp.columns
Index(['AT', 'V', 'AP', 'RH', 'PE'], dtype='object')
psdf_ccpp.head()
Just like Normal Pandas, pyspark.pandas
has features to plot data to understand the variables. In the below example, I plotted the original data and smoother versions of it. This example demonstrates the use of plot
and rolling
window methods
# Just Converting to display the simple plot to be published on Medium
pd_ccpp = psdf_ccpp.to_pandas()
Let us try plotting
# Plotting the variables
pd_ccpp['AT'].plot(figsize = (12,6))
pd_ccpp['AT'].rolling(window=200).mean().plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7f9b6aae8750>
Plotting all columns using a moving average of 20 data points.
pd_ccpp.rolling(window=20).mean().plot(figsize=(12,8))
# Try below
# psdf_ccpp.rolling(window=20).plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7f9b6a6a1590>
The below command is to demonstrate the use of describe
method similar as in Pandas API.
psdf_ccpp.describe()
Feature Engineering
The feature engineering step usually comes in junction with EDA and users prepare the features with an objective to be most predictive and nicely distributed. In this article, I will demonstrate the use of Koalas to perform feature engineering.
To understand the relationship between different variables, paiprplot function seaborn is widely used. Use the below command to plot the pairplots of variables in the dataset. Since Seaborn does not support the Koala dataframe, users have to convert it into a pandas dataframe before calling pairplot.
sns.pairplot(psdf_ccpp.to_pandas())
<seaborn.axisgrid.PairGrid at 0x7f9b77fd9050>
By looking at the above figure, we can see a lot of outliers and for some variable relationship with Target is not clear. To remove the outliers, the simplest solution is to calculate the moving average and I demonstrate that using Koalas.
sns.pairplot(psdf_ccpp.rolling(window=20).mean().to_pandas())
<seaborn.axisgrid.PairGrid at 0x7f9b6bf81390>
Looks like the 20-day moving average has a better relationship with the Target variable hence using the 20-day average features makes more sense.
Model Building using PySpark
Once the EDA and Feature engineering are done, it is time to build the predictive model. One of the benefits of using the Koalas dataframe is that users can create a Spark dataframe seamlessly. In the below section, I demonstrate the use of PySpark API to build and train Gradient Boosting Machines (GBM).
# Create the moving average features
psdf_features = psdf_ccpp.rolling(window=20, min_periods=1).mean()
# Convert the Koalas DataFrame into Spark DataFrame
sdf = psdf_features.to_spark()
sdf.show(5,False)
+------------------+------+------------------+-----------------+-----------------+
|AT |V |AP |RH |PE |
+------------------+------+------------------+-----------------+-----------------+
|14.96 |41.76 |1024.07 |73.17 |463.26 |
|20.07 |52.36 |1022.055 |66.125 |453.815 |
|15.083333333333334|48.04 |1018.7566666666667|74.79666666666667|465.3966666666667|
|16.5275 |50.36 |1016.6275 |75.2575 |460.6675 |
|15.386000000000001|47.788|1015.1479999999999|79.53 |463.314 |
+------------------+------+------------------+-----------------+-----------------+
only showing top 5 rows
Now, build the model using PySpark API. For more details on building models using PySpark refer to my article Machine Learning With Spark.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
Prepare the features compatible with PySpark models
# Create the feature column using VectorAssembler class
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_sdf = vectorAssembler.transform(sdf)
vpp_sdf.show(2, False)
+-----+-----+--------+------+-------+-----------------------------+
|AT |V |AP |RH |PE |features |
+-----+-----+--------+------+-------+-----------------------------+
|14.96|41.76|1024.07 |73.17 |463.26 |[14.96,41.76,1024.07,73.17] |
|20.07|52.36|1022.055|66.125|453.815|[20.07,52.36,1022.055,66.125]|
+-----+-----+--------+------+-------+-----------------------------+
only showing top 2 rows
Create the tarin and test splits
# Define train and test data split
splits = vpp_sdf.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]
Build and train the model
# Define the GBT Model
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
Evaluate the model accuracy
# Evaluate the GBT Model
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The RMSE of GBT Tree regression Model is {}".format(gbt_rmse))
The RMSE of GBT Tree regression Model is 1.0993809096065044
Converting prediction back to pyspark.pandas
DataFrame
psdf_predictions = ps.DataFrame(gbt_predictions)
psdf_predictions.head()
Let us plot the actual and predictions from the model.
psdf_predictions.to_pandas()[['PE', 'prediction']].plot(figsize=(12,8))
# Try
#psdf_predictions[['PE', 'prediction']].plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7f9b6a3bdb10>
A working Google Colab
Below is the working google colab notebook to recreate the tutorial. Give it a try and develop machine learning algorithms on top of the presented use of Koalas as Pandas replacement where possible.