avatarXinran Waibel

Summary

The web content provides a comprehensive guide on optimizing Apache Spark jobs for improved performance, covering topics such as rebalancing uneven partitions, the drawbacks of persisting RDDs, the benefits of using the Cost-Based Optimizer (CBO), implementing broadcast joins, and optimizing garbage collection.

Abstract

Apache Spark is a powerful tool for processing large datasets efficiently, but without proper tuning, performance can suffer. The article "Apache Spark Optimization Toolkit" offers insights into several key areas for optimization. It emphasizes the importance of addressing uneven data partitions, which can significantly slow down processing. The author suggests using repartition or coalesce to evenly distribute data across partitions. The article also discusses the trade-offs of caching RDDs, noting that while caching can improve performance for reused datasets, it can also hinder the Catalyst optimizer's ability to perform further optimizations. The Cost-Based Optimizer (CBO) is highlighted as a feature that can enhance query performance by utilizing table-level statistics, which should be kept up-to-date with ANALYZE TABLE commands. Additionally, the article covers broadcast joins as an efficient method for joining datasets of disparate sizes and provides tips on reducing garbage collection time to improve overall job performance.

Opinions

  • The author believes that rebalancing skewed partitions is crucial for optimal Spark performance.
  • Caching data in Spark is not universally beneficial and should be done judiciously, considering the impact on the Catalyst optimizer.
  • Maintaining up-to-date table statistics is essential for the Cost-Based Optimizer to function effectively.
  • Broadcast joins are recommended for scenarios involving a small table and a much larger table to minimize data shuffling across the network.
  • The article suggests that reducing the use of user-defined functions (UDFs) and avoiding the caching of unnecessary RDDs can lead to more efficient garbage collection and, consequently, better Spark job performance.

DATA ENGINEERING 101

Apache Spark Optimization Toolkit

A collection of useful tips for tuning Apache Spark jobs.

Source: Pixabay

Apache Spark, an open-source distributed computing engine, is currently the most popular framework for in-memory batch-driven data processing (and it supports real-time data streaming as well). Thanks to its advanced query optimizer, DAG scheduler, and execution engine, Spark is able to process and analyze large datasets very efficiently. However, running Spark jobs without careful tuning can still lead to poor performance.

In the blog post, I will share a couple of tips for Spark performance tuning to help you troubleshoot and speed up slow-running Spark jobs.

(All functions mentioned in this article are from PySpark and you can find equivalent functions for Scala/Java using the Spark API documentation.)

Uneven partitions

When a dataset is initially loaded by Spark and becomes a resilient distributed dataset (RDD), all data is evenly distributed among partitions. However, these partitions will likely become uneven after users apply certain types of data manipulation to them. For example, thegroupByKey operation can result in skewed partitions since one key might contain substantially more records than another. Moreover, because Spark’s DataFrameWriter allows writing partitioned data to disk using partitionBy, it is possible for on-disk partitions to be uneven as well.

Rebalancing skewed partitions in a DataFrame will tremendously improve Spark’s processing performance on the DataFrame. You can check the number of partitions in a DataFrame using thegetNumPartitions function and find the number of records in each partition by running a light Spark job, such as:

from pyspark.sql.functions import spark_partition_id
df.withColumn("partition_id", spark_partition_id())
  .groupBy("partition_id")
  .count().show()

If you find the partition sizes of a DataFrame are highly uneven, use either repartition or coalesce functions to repartition the DataFrame before running any analysis on it. It is also recommended to repartition data in memory before writing them back to disk. The RDD module supports these repartitioning functions as well.

The drawback of persisting RDD

Due to the lazy evaluation principle, Spark does not execute any actual transformations on a dataset unless users explicitly call an action to collect the results. Moreover, if users want to apply additional transformations on the intermediate results, Spark will need to recompute everything from the beginning. To allow users to reuse date more efficiently, Spark can cache data in memory and/or on disk using persist or cache functions.

However, caching is not always a good idea. After a dataset is cached by Spark, the Catalyst optimizer’s abilities to optimize further transformations will be limited, because it can no longer improve pruning at the source data level. For instance, if a filter is applied to a column that is indexed in the source database, Catalyst will not be able to take advantage of the index to improve performance.

Thus, caching data is recommended only if it will be reused multiple times later, e.g. when iteratively exploring a dataset or tuning ML models.

Source: Pixabay

Cost-Based Optimizer (CBO)

The cost-based optimizer (CBO) can speed up Spark SQL jobs by providing additional table-level statistics to Catalyst, which is especially helpful for jobs that join many datasets. Uses can enable CBO by setting spark.sql.cbo.enabled to true (default).

To fully take advantage of the CBO, users need to keep both column-level and table-level statistics up-to-date, allowing CBO to optimize query plans with accurate estimates. To do so, use ANALYZE TABLE commands to collect statistics before running SQL queries on the tables. Remember to analyze tables again after tables are modified to make sure statistics are up-to-date.

Broadcast Join

Besides enabling CBO, another way to optimize joining datasets in Spark is by using the broadcast join. In a shuffle join, records from both tables will be transferred through the network to executors, which is suboptimal when one table is substantially bigger than the other. In a broadcast join, the smaller table will be sent to executors to be joined with the bigger table, avoiding sending a large amount of data through the network.

Users can control broadcast join via spark.sql.autoBroadcastJoinThreshold configuration, indicating the maximum size of tables to be broadcasted. Moreover, a broadcast hint can be used to tell Spark to broadcast a table even if the size of the table is bigger thanspark.sql.autoBroadcastJoinThreshold:

from pyspark.sql.functions import broadcast
broadcast(spark.table("tbl_a")).join(spark.table("tbl_b"), "key")

Garbage collection (GC)

As all Spark jobs are memory-intensive, it is important to ensure garbage collecting is effective — we want to produce less memory “garbage” to reduce GC time. To find out whether your Spark jobs spend too much time in GC, check the Task Deserialization Time and GC Time in the Spark UI.

For example, using user-defined functions (UDF) and lambda functions will lead to longer GC time since Spark will need to deserialize more objects. It is also recommended to avoiding creating intermediate objects and caching unnecessary RDDs to JVM heaps.

TL;DR:

  • Rebalance uneven partitions using repartition or coalesce.
  • Persist data only if they will be reused for multiple times.
  • Use ANALYZE TABLE commands to maintain up-to-date statistics for CBO.
  • Enable broadcast join for small tables to speed up joins.
  • Optimize GC by using fewer UDFs and avoiding caching large objects.

Want to learn more about Data Engineering? Check out other articles from Data Engineer Things:

Data Science
Programming
Software Engineering
Technology
Towards Data Science
Recommended from ReadMedium