avatarAmine Kaabachi

Summary

Apache Spark 3.0 introduces Adaptive Query Execution (AQE) to enhance the performance of Spark SQL queries by optimizing query plans in real-time based on actual data statistics.

Abstract

The article discusses the integration of Adaptive Query Execution (AQE) in Apache Spark 3.0, which significantly improves the Catalyst Optimizer's ability to optimize SQL queries and DataFrame API operations. AQE allows Spark to adjust query plans dynamically by utilizing runtime statistics, leading to more efficient join strategies, optimal shuffle partition sizes, and mitigation of data skew issues. This adaptive approach represents a leap forward from the static, rule-based optimizations of Spark 2.X, where all optimizations were based on pre-execution estimations. AQE's ability to re-optimize plans during execution can result in faster query execution times and reduced need for manual tuning.

Opinions

  • The author suggests that AQE is a significant advancement for Spark SQL, indicating that it can lead to substantial performance improvements.
  • The article implies that the static nature of Spark 2.X's rule-based optimizations could lead to suboptimal query performance and required extensive manual tuning.
  • AQE is presented as a solution to the limitations of pre-runtime estimations, which could be inaccurate due to factors like filters and complex operators affecting data size.
  • The author expresses that AQE simplifies the process of configuring Spark by reducing the need for collecting statistics or engaging in complex configuration tuning.
  • It is noted that while AQE is not enabled by default, users can easily activate it by setting spark.sql.adaptive.enabled to true, which is expected to be beneficial for most users.

Apache Spark 3.0 Adaptive Query Execution

Today, Spark SQL is one of the most valuable components of Apache Spark. It powers both SQL queries and the DataFrame API. At its core, the Catalyst optimizer, which leverages advanced Scala features to build an extensible and extremely powerful query optimizer.

Photo by Balaji Malliswamy on Unsplash

In this article, we will try to understand the workflow of the Catalyst Optimizer and then dive deep into the new optimizations that Adaptive Query Execution enables.

Adaptive Query Execution (AQE) is a new feature available in Apache Spark 3.0 that allows it to optimize and adjust query plans based on runtime statistics collected while the query is running. To understand how it works, let’s first have a look at the optimization stages that the Catalyst Optimizer performs.

Catalyst Optimizer 101

The catalyst optimizer applies optimizations during logical and physical planning stages. It optimizes the query logically then generates a range of physical plans and selects the most efficient one based on a cost model.

Image by Author
  1. Unresolved Logical Plan: For a SQL query or a Dataframe, the optimizer accepts the unresolved logical plan and checks against syntax errors.
  2. Logical Plan: The optimizer uses the catalog to resolve the names that are still unresolved in the unresolved logical plan and then converts it into a logical plan.
  3. Optimized Logical Plan: The logical plan is enhanced by reorganizing the query order most efficiently.
  4. Physical Plans: Defines how the logical plan will physically execute across the cluster. In this stage, it produces many physical plans.
  5. Selected Physical Plan: Physical plans are evaluated using a cost model and the best one is selected.
  6. RDDs: Finally, the optimizer converts the physical plan to RDDs, and then generates bytecode for the JVM.

In Spark 2.X, all the optimizations were rule-based. They are all based on estimations that are calculated before runtime hence there may be other tuning possibilites that appear as the query runs and also issues that cannot be predicted from before the execution.

Adaptive Query Execution

AQE improves the Catalyst Optimizer workflow adjusting query plans based on runtime statistics collected during query execution.

Image by Author

Spark will send statistics about the real size of the data in the shuffle files so that for the next stage, it re-optimizes the logical plan to dynamically switch join strategies, coalesce the number of shuffle partitions, or optimize skew joins.

  • Switch join strategies : Broadcast Hash Join (BHJ) is one of the performant join strategies using Spark, it requires broadcasting the join relation. For it to stay optimal, there is a broadcast-size threshold (default 10mb). The issue with Spark 2.X optimizer is that it relies on an estimation of data based on input file sizes, this estimation cannot be always accurate as filters and complex operators can interfere to modify the size meanwhile. When AQE is enabled, it replans the join strategy at runtime based on the most accurate join relation size.
  • Coalse shuffle partitions: If you did any dirty hands-on with Spark, you know that you have to tune the shuffle partitions at one time or another. The main issue with shuffle partitions is that the best number to choose is relevant to data size. Coming up with a number is not an easy task, so people generally use several trials and then choose the best one. But if they got it wrong, it can lead to spilling data to disk or lots of network data fetches. AQE combines adjacent small partitions into bigger partitions at runtime by looking at the shuffle file statistics, so we can set the number to a big one at first and AQE will do the rest :)
  • Optimize skew joins: Data skew is a problem of data distribution. Meaning that you have data skew if your partitions vary largly in size in the cluster. It can downgrade query performance, especially with joins. AQE detects such skew automatically from shuffle files statistics. It then splits the skewed partitions into smaller subpartitions, which will be joined to the corresponding partition from the other side respectively.

Conclusion

AQE is a great addition to the Apache Spark optimize. It can dramatically speed up your queries. Under the hood, It improves your query plan as your query runs, eliminating the need to collect statistics or worry about inaccurate estimations and hard-core spark config tuning.

Note that It is turned off by default, but you can enable it by setting spark.sql.adaptive.enabled to true. Please check the documentation for details on the configuration options.

Spark
Spark Sql
Big Data
Data
Data Science
Recommended from ReadMedium