avatarMA Raza, Ph.D.

Summary

The provided content discusses the integration of the Pandas API into PySpark with the release of Spark 3.2.0, facilitating easier data processing and machine learning tasks on large datasets through a familiar Pandas-like interface.

Abstract

The article "A PRACTICAL GUIDE: SPARK 3.2.0" highlights a significant advancement in the unification of Pandas and PySpark, introducing the Pandas API on Apache Spark. This integration, known as pyspark.pandas, aims to streamline the transition for data scientists and machine learning engineers from small to big data processing. The new Koalas library, which is under active development, covers over 80% of the Pandas API and is now integrated directly into PySpark. The article guides readers through setting up Spark 3.2.0 in Google Colab, using pyspark.pandas for exploratory data analysis, and building machine learning models with Spark's distributed computing capabilities. It emphasizes the benefits of using Pandas with Spark, such as easy big data processing, heavy lifting under the hood, seamless integration, and the ability to use default plotting APIs and build ML models. The tutorial also covers the installation and testing of Spark, the conversion between Pandas and PySpark data frames, and the use of Spark data frames with the new API. The author demonstrates feature engineering and model building using Gradient Boosting Machines (GBM) and evaluates the model's accuracy, concluding with the ease of switching to Spark for Pandas users and the advantages of distributed computations.

Opinions

  • The author believes that the integration of the Pandas API into PySpark is a key upgrade in Spark 3.2.0, simplifying the use of Spark for data scientists familiar with Pandas.
  • There is an opinion that the Koalas library, which facilitates the use of Pandas API with Spark, is a significant development that enhances the capabilities of PySpark users.
  • The article suggests that using pyspark.pandas for exploratory data analysis and feature engineering is beneficial due to the seamless integration with PySpark and the ability to handle large datasets efficiently.
  • The author conveys that the new features in Spark 3.2.0, such as the Pandas API integration, ANSI SQL compatibility, adaptive query execution, and RocksDB state store, are steps towards making Spark more user-friendly and powerful for big data processing.
  • The tutorial implies that the combination of PySpark's distributed computing power with the familiarity of Pandas API can lead to more productive and efficient data processing and machine learning workflows.

A PRACTICAL GUIDE: SPARK 3.2.0

A new Era of SPARK and PANDAS Unification

Pyspark and Pandas

Image by Author

Most of the data scientists or machine learning engineers start with Pandas and Numpy before moving to other libraries. No one can debate around the use of Pandas as a standard data processing library. There are a lot of benefits of using Pandas however one key bottleneck of Pandas API is to adapt with distributed processing. Solutions like Modin and Dask solve this problem to some extent.

When it comes to using distributed processing frameworks, Spark is the de-facto choice for professionals and large data processing hubs. Recently, Databricks’s team open-sourced a library called Koalas to implement the Pandas API with spark backend. This library is under active development and covers more than 80% of Pandas API. To read more about using Koalas, refer to my earlier article Spark-ifying Pandas: Databrick’s Koalas with Google Colab.

With the release of Spark 3.2.0, the KOALAS is integrated in the pyspark submodule named as pyspark.pandas. The seamless integration of pandas with Spark is one of the key upgrades to Spark. Some of the key features of this release are

  • Introducing pandas API on Apache Spark to unify small data API and big data API (learn more here).
  • Completing the ANSI SQL compatability mode to simplify migration of SQL workloads.
  • Productionizing adaptive query execution to speed up Spark SQL at runtime.
  • Introducing RocksDB statestore to make state processing more scalable.

We will focus on PANDAS on SPARK

In this tutorial, I will walk you through to perform exploratory data analysis using pyspark.pandas and performing some basic pandas functionalities. There are a lot of benefits of using pandas with a spark when dealing with large datasets. Some of the key points are

  1. Big data processing made easy
  2. Under the hood heavy lifting
  3. Integration with PySpark is seamless
  4. Using the default plotting APIs
  5. Building ML Models

The objective of this tutorial is to leverage the Spark backend for pandas. The working google collaboratory will be embedded.

Setting up Spark 3.2.0 in the Google Colaboratory

As a first step, I configure the google colab runtime with spark installation. For details, readers may read my article Getting Started Spark 3.0.0 in Google Colab om medium.

We will install the below programs

you can install the LATEST version of Spark using the below set of commands.

# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

Use the ls command to see the downloaded files

!ls
ccpp.csv
Getting_Started_Koalas_2021-10-29_10-34-58_820424.log
Getting_Started_Koalas_2021-10-29_10-39-16_103833.log
sample_data
spark-3.2.0-bin-hadoop3.2
spark-3.2.0-bin-hadoop3.2.tgz

Environment Variable

After installing the spark and Java, set the environment variables where Spark and Java are installed.

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

Spark Installation test

Let us test the installation of spark in our google colab environment.

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark 
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])

df.show(3, False)
+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows

Install loguru to log the events

# Install compatible version of pyarrow
!pip install loguru
Collecting loguru
  Downloading loguru-0.5.3-py3-none-any.whl (57 kB)
     |████████████████████████████████| 57 kB 4.8 MB/s 
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.5.3

Importing the essential libraries

# Import the libraries
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import seaborn as sns
# Import logger
from loguru import logger
logger.add("Getting_Started_Koalas_{time}.log")
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.

Install the Compatible Version of plotly

For plotting, requires plotly version >=4.8

for details on the requirements, follow below link https://koalas.readthedocs.io/en/latest/getting_started/install.html#dependencies

import plotly 
logger.info(f'plotly_version{plotly.__version__}')
2021-10-29 11:29:31.122 | INFO     | __main__:<module>:2 - plotly_version5.3.1
! pip uninstall plotly
Found existing installation: plotly 4.4.1
Uninstalling plotly-4.4.1:
  Would remove:
    /usr/local/etc/jupyter/nbconfig/notebook.d/plotlywidget.json
    /usr/local/lib/python3.7/dist-packages/_plotly_future_/*
    /usr/local/lib/python3.7/dist-packages/_plotly_utils/*
    /usr/local/lib/python3.7/dist-packages/plotly-4.4.1.dist-info/*
    /usr/local/lib/python3.7/dist-packages/plotly/*
    /usr/local/lib/python3.7/dist-packages/plotlywidget/*
    /usr/local/share/jupyter/nbextensions/plotlywidget/extension.js
    /usr/local/share/jupyter/nbextensions/plotlywidget/index.js
Proceed (y/n)? y
  Successfully uninstalled plotly-4.4.1
!pip install plotly
Collecting plotly
  Downloading plotly-5.3.1-py2.py3-none-any.whl (23.9 MB)
     |████████████████████████████████| 23.9 MB 1.8 MB/s 
[?25hCollecting tenacity>=6.2.0
  Downloading tenacity-8.0.1-py3-none-any.whl (24 kB)
Requirement already satisfied: six in /usr/local/lib/python3.7/dist-packages (from plotly) (1.15.0)
Installing collected packages: tenacity, plotly
Successfully installed plotly-5.3.1 tenacity-8.0.1

Hands-On: PySpark Pandas in Action

In this section, we get hands-on learning of using pyspark.pandas to perform basic functionalities of playing with data as we do in pandas.

I will use the Combined Cycle Power Plant data set to predict the net hourly electrical output (EP). I have uploaded the data to my GitHub so that users can reproduce the results.

Basic features

In this section, let us learn basic features of pyspark.pandas.

psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
psdf
psdf.head()

To/From Pandas

The users can easily convert pandas data frames into pyspark.pandas data frames. Below are the examples

# Converting to/from pandas dataframes
pddf = psdf.to_pandas()
# Let us find out the type of dataframes
type(pddf), type(psdf)
(pandas.core.frame.DataFrame, pyspark.pandas.frame.DataFrame)
psdf_ = ps.from_pandas(pddf)
type(psdf_)
pyspark.pandas.frame.DataFrame

To/From Spark dataframes

Sometimes, we have to convert spark dataframes to pyspark.pandas dataframes and vice versa. It is pretty straightforward now.

dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf
# Create Spark Dataframe using Pandas dataframe
sdf = spark.createDataFrame(pdf)
type(sdf)
pyspark.sql.dataframe.DataFrame
sdf.show()
+--------------------+--------------------+-------------------+--------------------+
|                   A|                   B|                  C|                   D|
+--------------------+--------------------+-------------------+--------------------+
|-0.06088523641087523| 0.23338445585443923| 0.5999170439315439|-0.20984570996986368|
| 0.22791307776510422|  0.7763417712895295|0.03037620161121167|  0.7774577062233787|
| 0.29336208280885484|-0.08371353732918399| -0.649808816600661| -0.6308517359448679|
|  0.5478211374585771|  0.1611948786816446|-0.7429478444712448|  0.3734789112693188|
|  0.5225691421190196|-0.33997894700945974|0.16459448070835278| -0.5744045570930776|
| -0.5061501459749616| -1.6594845990596399|0.44475073046054053|  0.8386075096162613|
+--------------------+--------------------+-------------------+--------------------+
psdf = sdf.to_pandas_on_spark()
type(psdf)
pyspark.pandas.frame.DataFrame

Groupby made Simple

Similar to pandas, pyspark.pandas can use the standard groupby functionalities.

psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})
psdf.groupby('A').sum()
psdf.groupby(['A', 'B']).sum()

To practice more, see below link

https://hub.mybinder.turing.ac.uk/user/apache-spark-yahgfg9j/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb

Exploratory Data Analysis using pyspark.pandas

As a first step, I want to explore the given data, its distribution, and dependency using pyspark.pandas API. I will include a simple example to demonstrate the idea, users can extend it for the problem in hand.

Download the data and save it locally

# Downloading the clustering dataset
!wget -q 'https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv'

Read the data using pyspak.pandas.read_csv method. To read more about the API follow the Koalas official documentation

# Read the data
psdf_ccpp = ps.read_csv("ccpp.csv")

Let us check the type of dataframes

# Check the type of psdf
type(psdf_ccpp)
pyspark.pandas.frame.DataFrame
psdf_ccpp.columns
Index(['AT', 'V', 'AP', 'RH', 'PE'], dtype='object')
psdf_ccpp.head()

Just like Normal Pandas, pyspark.pandas has features to plot data to understand the variables. In the below example, I plotted the original data and smoother versions of it. This example demonstrates the use of plot and rolling window methods

# Just Converting to display the simple plot to be published on Medium
pd_ccpp = psdf_ccpp.to_pandas()

Let us try plotting

# Plotting the variables
pd_ccpp['AT'].plot(figsize = (12,6))
pd_ccpp['AT'].rolling(window=200).mean().plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7f9b6aae8750>

Plotting all columns using a moving average of 20 data points.

pd_ccpp.rolling(window=20).mean().plot(figsize=(12,8))

# Try below
# psdf_ccpp.rolling(window=20).plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7f9b6a6a1590>

The below command is to demonstrate the use of describe method similar as in Pandas API.

psdf_ccpp.describe()

Feature Engineering

The feature engineering step usually comes in junction with EDA and users prepare the features with an objective to be most predictive and nicely distributed. In this article, I will demonstrate the use of Koalas to perform feature engineering.

To understand the relationship between different variables, paiprplot function seaborn is widely used. Use the below command to plot the pairplots of variables in the dataset. Since Seaborn does not support the Koala dataframe, users have to convert it into a pandas dataframe before calling pairplot.

sns.pairplot(psdf_ccpp.to_pandas())
<seaborn.axisgrid.PairGrid at 0x7f9b77fd9050>

By looking at the above figure, we can see a lot of outliers and for some variable relationship with Target is not clear. To remove the outliers, the simplest solution is to calculate the moving average and I demonstrate that using Koalas.

sns.pairplot(psdf_ccpp.rolling(window=20).mean().to_pandas())
<seaborn.axisgrid.PairGrid at 0x7f9b6bf81390>

Looks like the 20-day moving average has a better relationship with the Target variable hence using the 20-day average features makes more sense.

Model Building using PySpark

Once the EDA and Feature engineering are done, it is time to build the predictive model. One of the benefits of using the Koalas dataframe is that users can create a Spark dataframe seamlessly. In the below section, I demonstrate the use of PySpark API to build and train Gradient Boosting Machines (GBM).

# Create the moving average features
psdf_features = psdf_ccpp.rolling(window=20, min_periods=1).mean()
# Convert the Koalas DataFrame into Spark DataFrame
sdf = psdf_features.to_spark()
sdf.show(5,False)
+------------------+------+------------------+-----------------+-----------------+
|AT                |V     |AP                |RH               |PE               |
+------------------+------+------------------+-----------------+-----------------+
|14.96             |41.76 |1024.07           |73.17            |463.26           |
|20.07             |52.36 |1022.055          |66.125           |453.815          |
|15.083333333333334|48.04 |1018.7566666666667|74.79666666666667|465.3966666666667|
|16.5275           |50.36 |1016.6275         |75.2575          |460.6675         |
|15.386000000000001|47.788|1015.1479999999999|79.53            |463.314          |
+------------------+------+------------------+-----------------+-----------------+
only showing top 5 rows

Now, build the model using PySpark API. For more details on building models using PySpark refer to my article Machine Learning With Spark.

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

Prepare the features compatible with PySpark models

# Create the feature column using VectorAssembler class
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_sdf = vectorAssembler.transform(sdf)
vpp_sdf.show(2, False)
+-----+-----+--------+------+-------+-----------------------------+
|AT   |V    |AP      |RH    |PE     |features                     |
+-----+-----+--------+------+-------+-----------------------------+
|14.96|41.76|1024.07 |73.17 |463.26 |[14.96,41.76,1024.07,73.17]  |
|20.07|52.36|1022.055|66.125|453.815|[20.07,52.36,1022.055,66.125]|
+-----+-----+--------+------+-------+-----------------------------+
only showing top 2 rows

Create the tarin and test splits

# Define train and test data split
splits = vpp_sdf.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]

Build and train the model

# Define the GBT Model
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)

Evaluate the model accuracy

# Evaluate the GBT Model
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The RMSE of GBT Tree regression Model is {}".format(gbt_rmse))
The RMSE of GBT Tree regression Model is 1.0993809096065044

Converting prediction back to pyspark.pandas DataFrame

psdf_predictions = ps.DataFrame(gbt_predictions)
psdf_predictions.head()

Let us plot the actual and predictions from the model.

psdf_predictions.to_pandas()[['PE', 'prediction']].plot(figsize=(12,8))
# Try
#psdf_predictions[['PE', 'prediction']].plot()
<matplotlib.axes._subplots.AxesSubplot at 0x7f9b6a3bdb10>

A working Google Colab

Below is the working google colab notebook to recreate the tutorial. Give it a try and develop machine learning algorithms on top of the presented use of Koalas as Pandas replacement where possible.

Conclusions

In this tutorial, I have demonstrated the use of pandas, pyspark.pandas and spark to perform exploratory data analysis and feature engineering and building ML, models. For Pandas users, switching to Spark is straightforward now with the benefit of using Spark backend for distributed computations. Below are the key points discussed.

  • pandas vs pyspark.pandas
  • pyspark.pandas use to perform EDA
  • Feature Engineering using pyspark.pandas
  • PySpark integration with Pandas

References Readings/Links

  1. https://spark.apache.org/docs/latest/ml-features.html
  2. https://koalas.readthedocs.io/en/latest/?badge=latest
  3. https://towardsdatascience.com/machine-learning-with-spark-f1dbc1363986
  4. https://readmedium.com/getting-started-spark3-0-0-with-google-colab-9796d350d78
  5. https://readmedium.com/spark-ifying-pandas-databricks-koalas-with-google-colab-93028890db5
  6. https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
  7. https://databricks.com/blog/2021/10/19/introducing-apache-spark-3-2.html
  8. https://hub.mybinder.turing.ac.uk/user/apache-spark-yahgfg9j/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb
Python
Data Science
Spark
Pandas
Machine Learning
Recommended from ReadMedium