avatarANGE KOUAME

Summary

The web content provides a comparative analysis of different types of User-Defined Functions (UDFs) in Apache Spark, including standard UDFs, pandas UDFs, and Arrow-optimized UDFs, and evaluates their performance and use cases.

Abstract

The article on the undefined website presents an in-depth comparative overview of various UDF types available in Apache Spark, focusing on their implementation, performance, and practical considerations. It explains the purpose of UDFs, which are designed to extend Spark's capabilities for custom data manipulation using languages like Python, R, Java, and Scala. The author discusses the limitations of standard UDFs, such as expensive serialization and memory management issues, and introduces pandas UDFs as an improvement with faster serialization via Apache Arrow. Furthermore, the article highlights the latest advancement with Arrow-optimized UDFs, which offer better serialization, consistent type handling, and enhanced performance for complex data types. A performance comparison using a dataset with 40 million rows demonstrates the efficiency gains of pandas UDFs and Arrow-optimized UDFs over standard UDFs. The conclusion suggests that while native Spark functions should be prioritized, Arrow-optimized UDFs are recommended for cases where UDFs are necessary, due to their improved performance and broader data type support.

Opinions

  • The author suggests that pandas UDFs and Arrow-optimized UDFs offer significant performance improvements over standard UDFs, with the latter being slightly more efficient.
  • There is an emphasis on the importance of understanding both Pandas and Spark APIs to effectively leverage pandas UDFs.
  • The author recommends using Spark's native functions whenever possible, as UDFs are not optimized by Spark's Catalyst optimizer.
  • The article conveys that the introduction of Arrow-optimized UDFs addresses previous limitations and extends beyond the pandas API, making them a superior choice for UDF implementation in Spark.
  • The author encourages readers to adopt Arrow-optimized UDFs, especially with runtime 14 and above on Databricks, for improved data processing tasks.

Apache Spark : A comparative overview of UDF, pandas-UDF and arrow-optimized UDF

Image by DALLE-3 and enriched by the author with text

What does UDF mean and why do they exist ?

UDF stands for User Defined Function. Those functions can be written in Python, R, Java and Scala, enabling more personalized and complex data manipulation.

Indeed, they are designed to make Spark framework more accessible and flexible for a wider range of users. This encourages broader adoption by providing the flexibility needed for various data processing tasks.

1. Standard UDF

UDFs in Apache Spark can be written using various programming languages such as Python, Scala, Java, and R. Each language offers a distinct implementation approach with differences in performance and functionality.

When executed, UDFs are crafted from the driver node to the executors nodes and this regardless the programming language used.

1.1 Java and Scala UDF

UDFs ran within the Java Virtual Machine (JVM) with minor performance penalties. In fact, they do not capitalize on Spark’s advanced optimization features such as Whole-Stage Code Generation and Just-In-Time (JIT) compilation, which are designed to enhance performance.

1.2 Python UDF

How python UDF is processed in spark in a cluster (driver + 3 executors). Image by the author

1- Python UDF function is sent to each executors [1]

2- Unlike Java and Scala UDF, the function is not executed within the JVM. Actually, a python worker process is opened on each executor and data is serialized using pickle and send to the python function. The response of the UDF is then deserialized back to the JVM. [1]

1.3 Notes and considerations on classic UDFs

  • Starting a python process on each executor is expensive given data serialization and deserialization.
  • Once the data serialized to the python UDF , spark does not manage the memory which can lead to serious OOM (Out Of Memory) issues.
  • If the column used as input for UDF contains Null, you should manage them in your UDF to avoid unexpected results.
  • A better practice is to always specify the return type of your UDF to avoid types mismatches.
from pyspark.sql.types import StringType()
def str_enriched(col_):
  return f"{col_}_hello"

udf_str_enriched = udf(str_enriched, StringType())
# suppose welc_str does not contains null and is contains only string
df = df.withColumn("new_word", udf_str_enriched(welc_str))

2. Pandas UDF

Pandas-UDFs were introduced in Spark 2.3 with the aim to speed-up the serialization/deserialization part of the UDF process by using vectorized batches with Apache Arrow (up to 100x speed increase). Apart this it shares the same characteristics as traditional UDFs.

Let’s deep dive into some of its available transformations

  • pandas series to pandas series
  • Iterator of pandas series to Iterator of pandas series
  • Iterator of Multiple pandas series to Iterator of pandas series
  • Pandas series to Scalar (a single value)

2.1 Series to series

This type of UDF operates on a Series and returns a Series of the same length

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

# UDF to apply state sales tax to the price column
@pandas_udf(DoubleType())
def apply_sales_tax(price_series: pd.Series) -> pd.Series:
    sales_tax_rate = 0.07  # 7% sales tax
    return price_series * (1 + sales_tax_rate)

# Usage in Spark DataFrame
df = spark.createDataFrame([(1, 19.99), (2, 15.99), (3, 23.99)], ["id", "price"])
df_with_tax = df.withColumn("price_with_tax", apply_sales_tax(df["price"]))

2.1 Iterator of Series to Iterator of Series

The Iterator[Series] to Iterator[Series] Pandas UDF is particularly useful for transformations where retaining state or context across different rows is needed.

@pandas_udf('long')
def pandas_cumulative_sum(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Compute the cumulative sum for each partition (series) in the iterator
    return (series.cumsum() for series in iterator)

# Create a DataFrame with a sequence of numbers
df = spark.range(10)

# Apply the cumulative sum UDF to the DataFrame
df_with_cumsum = df.withColumn("cumulative_sum", pandas_cumulative_sum("id"))

df_with_cumsum.show()

In this example, we define a pandas_cumulative_sum UDF that computes the cumulative sum on the id column. The resulting DataFrame will have a new column cumulative_sum with the cumulative sum ofid .

2.3 Iterator of Multiple Series to Iterator of Series

Here, we process multiple columns of the spark Dataframe at once. The UDF operates on these series element-wise and yields a single Series as output.

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
from typing import Iterator, Tuple
import pandas as pd


# Pandas UDF definition
@pandas_udf("double")
def calculate_total_sales(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    return (quantity * price for quantity, price in iterator)

# Create a dataframe for testing purpose
df = spark.createDataFrame([
    (1, 4, 10.0),  
    (2, 2, 20.0),  
    (3, 1, 30.0),  
    (4, 5, 40.0),  
], ["item_id", "quantity", "price_per_item"])

# Apply the UDF on total_sales
df_with_total_sales = df.withColumn("total_sales", calculate_total_sales(df["quantity"], df["price_per_item"]))

df_with_total_sales.show()

The UDF defined as calculate_total_sales is applied to each pair of Series in the iterator. Within the UDF, a generator expression (quantity * price for quantity, price in iterator) is used to perform an element-wise multiplication of the two Series.

2.4 Series to Scalar (a single value)

An aggregation of a pandas series to a scalar is performed here.

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf("double")
def pandas_mean(v: pd.Series) -> float:
    return v.sum()

df.select(pandas_mean(df['v'])).show()

2.5 Notes and considerations on pandas-udf

  • Data transmission from the JVM to the Python process in Apache Spark is expedited through batch processing using Apache Arrow, enhancing efficiency . Nonetheless, the serialization and deserialization process involved in this operation may still introduce some overhead.
  • The UDF incorporated in the query is still a black box in the query’s physical plan and is not subject to optimization by Spark’s Catalyst optimizer.
  • A good understanding of both Pandas API and Spark API is required to leverage this functionnality
  • The data types supported by Pandas UDFs might be limited especially for complex types like arrays or maps.
  • Arrow efficient data representation help here reducing the footprint of data being transfered between JVM and function.

3. Arrow optimized UDF

Arrow-optimized UDFs were introduced in Spark 3.5 to tackle the limitations of Pandas UDFs and UDFs by

  1. Better serialization/deserialization : Apache Arrow is used here for data serialization/deserialization as Pandas-udf. However, the limitation of only interacting with pandas API is broken.
  2. Consistent Type Handling: Apache Arrow provides a consistent and rich set of data types that are well-integrated with both Python and the JVM ecosystem.
  3. Type Coercion and Conversion: Arrow’s type system helps in better managing type coercion and conversion. In traditional Pandas UDFs, mismatches between the defined types in UDFs and the actual returned data types could lead to errors or unexpected behavior. Arrow’s robust type system reduces such issues by providing clear and well-defined type conversions.
  4. Enhanced Performance for Complex Types: Arrow’s efficient representation of complex data types (like structs, lists, and maps) can lead to performance improvements especially for operations that involve these types. This is because Arrow can handle these types natively in its columnar format efficiently.

Arrow-optimized Python UDFs can be enabled by setting the spark.sql.execution.pythonUDF.arrow.enabled configuration at the notebook/cluster level or by setting to True the keyword useArrow on the UDF decorator.

@udf(returnType="string", useArrow=True)
def plus_one(col_):
  return col_+1

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

df = df.withColumn("v_plus_un", plus_one("v"))

4. Comparison with a dataset

Let’s compare performance of pySpark UDFs, Pandas UDFs and Arrow-Optimized UDFs in using a dataset with 40 millions rows.

4.1 Cluster Configuration

A standard cluster configuration for databricks community edition with runtime 14.1 and 15.3 GB RAM.

Databricks community cluster. Image by the author

4.2 Dataset

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType
import pandas as pd

# Customized spark object
spark = SparkSession.builder.appName("udf_comparison").getOrCreate()

# Create a pyspark dataFrame with 40.000.000 rows
df = spark.range(40000000).toDF("value")

4.3 UDFs

4.3.1 Standard UDF

def add_one(x):
    return x + 1

add_one_udf = udf(add_one, IntegerType())

# Apply traditional UDF
df_traditional_udf = df.withColumn("new_value", add_one_udf("value"))

4.3.2 Pandas UDF

@pandas_udf(IntegerType())
def pandas_add_one(series: pd.Series) -> pd.Series:
    return series + 1

# Apply Pandas UDF
df_pandas_udf = df.withColumn("new_value", pandas_add_one("value"))

4.3 Arrow-optimized UDF

@udf(returnType="int", useArrow=True)
def add_one_udf(x):
    return x + 1

# Apply traditional UDF
df_traditional_udf = df.withColumn("new_value", add_one_udf("value"))

4.4 Test results and notes

UDF vs Pandas UDF vs Arrow-optimized UDF on the dataset described earlier. Image by the author
  • Pandas-udf has approximatively 22% gain compared to UDF
  • Arrow-optimized has 27% gain compared to UDF, slightly better than pandas-udf.

CONCLUSION

By introducing Arrow-Optimized UDFs, Spark addressed a number of inherent issues that were not yet resolved with Pandas UDFs such as the support for a wider range of data types and extension beyond the pandas API. Given its ease of use and configuration, I highly recommend to switch your spark UDF to arrow-optimized UDF (runtime 14 and above on Databricks).

However, it’s important to remember that UDF aren’t optimized by Spark’s Catalyst optimizer. Therefore, whenever feasible, use Spark native transformations functions available in spark.sql.functions .

For more articles and insights around Data engineering and Spark, feel free to follow me on Linkedin.

#dataengineering #spark #databricks

References

[1] B. Chambers and M. Zaharia, Spark: The Definitive Guide (2018), O’Reilly Media, Inc.

Data Engineering
Spark
Databricks
Data
Optimization
Recommended from ReadMedium