Apache Spark Join Strategies in Depth
When you try to join your data using Spark, It automatically chooses the most suitable algorithm internally to optimize performance. In this article, we will unravel the mysteries behind Spark joins and provide some tips to optimize them.
Spark Join Strategies
From Spark 3.0.0, the available join strategies are as follows:
- Broadcast Hash Join
- Shuffle Hash Join
- Sort Merge Join
- Cartesian Product Join
- Broadcast Nested Loop Join
In the coming section, we’ll explore each of them in detail.
How Does Spark Choose the Right Strategy?
Now, let’s explore how Spark selects the most appropriate join strategy. The choice is based on the factors below:
- Data Size: Spark chooses a join strategy based on the size of the data. To avoid costly shuffle and sort operations, it favors hash-based join strategies, especially when data can be broadcasted.
- Type of Join: Spark supports both Equi Join (using “=”) and Non-Equi Join (using “<,” “>”, “≥,” “≤”). Non-Equi Join involves comparisons to a range of unspecified values, necessitating the use of nested loop joins. In this context, Spark specifically supports Broadcast Nested Loop Join and Cartesian Product Join for these types of joins.
- Join Hints: If the user seeks more control over the join strategy selection can provide join hints.
- Sortability of rows.
The following diagram explains the flow that Spark uses to decide on the join strategy to use.
Let’s delve into each of the join strategies.
Demo Code (Optional)
If you choose to follow along and execute the code as you read, ensure you have a Python environment with the PySpark dependency installed. Alternatively, you can continue learning by simply reading the code,
If you choose to follow along and execute the code as you read, ensure you have the following installed in your environment. Alternatively, you can simply read the code, as it is straightforward to comprehend.
The versions employed in this article are:
- Python: 3.11.6
- PySpark: 3.4.1
We’ll use two Dataframes, df_clients
and df_orders
as shown by the code below. You can find the complete source code for this article in my Github repository.
spark = SparkSession.builder\
.master("local[*]")\
.appName("join-strategies")\
.getOrCreate()
df_clients = spark.createDataFrame([
(0, "client1"),
(1, "client2"),
(2, "client3")])\
.toDF("client_id", "name")
df_orders = spark.createDataFrame([
(0, "order1", 100),
(1, "order2", 200),
(2, "order3", 150)])\
.toDF("client_id", "order_id", "order_amount")
Broadcast Hash Join
It’s ideal when one DataFrame is small enough to fit in the memory of each executor. Spark broadcasts the smaller DataFrame to all workers. This minimizes data shuffling and accelerates the join operation, as the join occurs within the same node, resulting in a decrease in network overhead.
The default size threshold for a DataFrame to use broadcasting is 10 MB, meaning the DataFrame needs to be smaller than 10 MB. You can adjust this default size by configuring the spark.sql.autoBroadcastJoinThreshold
setting. However, you should take into account the available memory in both your driver and executors. Setting this value larger than the available memory can result in decreased performance or even lead to Out-Of-Memory errors.
The hint BROADCAST
incidates to Spark to use the Broadcast Hash Join strategy as shown by the code below.
broadcast_df = df_clients.hint('BROADCAST').join(df_orders, on='client_id', how='inner')
broadcast_df.explain(mode="formatted")
broadcast_df.show()
In both the Physical Plan and DAG (Directed Acyclic Graph) form in the UI, you can notice that Spark used BroadcastHashJoin and only the left DataFrame was broadcasted, which is df_clients
in our case.
Shuffle Hash Join
It’s suitable when neither of the joined tables can fit in memory. It involves a shuffle phase, where data is redistributed across partitions based on the join key. Be careful when using this strategy, as it may incur higher network and disk I/O costs, which largely decreases the performance due to the full shuffle.
You can decide to either enable or disable this strategy by setting the parameter spark.sql.join.preferSortMergeJoin
to false or true. The hint SHUFFLE_HASH
suggests Spark use Shuffle Hash Join, as shown in the following code:
shuffle_hashed_df = df_clients.hint('SHUFFLE_HASH').join(df_orders, on='client_id', how='inner')
shuffle_hashed_df.explain(mode="formatted")
shuffle_hashed_df.show()
Within the Physical Plan and DAG below, we notice that Shuffle Hash Join was applied, and the two DataFrames were shuffled (the Exchange keyword indicates the shuffle).
Sort Merge Join
It’s appropriate when both tables are large and cannot fit in the memory. It Involves sorting both tables based on the join key and then merging them. It provides good performance for certain types of queries but requires sorting, which can be computationally expensive. This strategy can be enabled or disabled by setting the parameter spark.sql.join.preferSortMergeJoin
to false or true.
The MERGE
join hint suggests Spark use shuffle sort merge join. Its aliases are SHUFFLE_MERGE
and MERGEJOIN
.
sort_merge_join = df_clients.hint("MERGE").join(df_orders, on='client_id', how='inner')
sort_merge_join.explain(mode="formatted")
sort_merge_join.show()
As indicated by the Physical Plan and DAG, the Sort Merge Join occurs in three steps:
- Shuffle: The data from both tables is partitioned based on the join key. This partitioning ensures that records with the same join key are directed to the same partition.
- Sort: Within each partition, the data is then sorted based on the join key.
- Merge: The sorted data is subsequently merged across partitions to execute the join operation.
Cartesian Product Join
It involves joining every row from the first table with every row from the second table, making it highly resource-intensive. This strategy should be avoided for large datasets due to a significant increase in the number of records.
To utilize this strategy, enable it by setting the spark.sql.crossJoin.enabled
parameter to true. The hint SHUFFLE_REPLICATE_NL
is employed to suggest to Spark to use the Cartesian Join strategy, as demonstrated in the following code.
cartesian_product_join_df = df_clients.hint('SHUFFLE_REPLICATE_NL').join(df_orders, on='client_id', how='inner')
cartesian_product_join_df.explain(mode="formatted")
cartesian_product_join_df.show()
Broadcast Nested Loop Join
It’s useful when joining a large table with a small table that doesn’t fit in memory but has a filter condition. The smaller table is broadcasted, and a nested loop is used to join matching records.
To instruct Spark to utilize the Broadcast Nested Loop Join, the following code begins by enabling crossJoin through setting spark.sql.crossJoin.enabled
to true. Subsequently, the hint BROADCAST
is employed.
spark.conf.set("spark.sql.crossJoin.enabled", True)
broadcast_nested_loop_join_df = df_clients.hint("BROADCAST").join(df_orders)
broadcast_nested_loop_join_df.explain(mode="formatted")
broadcast_nested_loop_join_df.show()
In the Physical Plan, you can notice that Spark used BroadcastNestedLoopJoin, and only the left DataFrame was broadcasted.
Tips for Optimizing Joins in Spark
Here are some tips to consider that could enhance your join performance.
Partitioning
- Ensure that both tables are properly partitioned based on the join key to minimize data shuffling.
- Optimal partitioning can significantly improve join performance.
Caching
- Cache or persist smaller tables in memory if they are reused in multiple stages of a Spark application.
- Reduces the need to recompute the smaller table for each operation.
Broadcasting
- Use broadcast joins whenever applicable to leverage the benefits of data locality and reduce network overhead.
Configuration Tuning
- Adjust Spark configuration parameters, such as executor memory and parallelism, based on the characteristics of your data and workload.
Conclusion
Choosing the right join strategy in Spark is crucial for optimizing performance and resource utilization. Each strategy has its strengths and weaknesses, and understanding the nature of your data and the requirements of your queries will guide you in selecting the most suitable approach. By leveraging these join strategies effectively, you can harness the full power of Apache Spark for large-scale data processing.
Thanks! I hope you found it helpful. If so, 👏