Performance Tuning of AWS EMR Spark Job With Hands-on Demo
Often we come across cases when we need to optimize our spark job, this writeup drills down in the details with hands-on examples

What we will learn:
Spark optimization via :
- Identifying and setting correct Apache Spark Configurations
- Executor Numbers
- Caching
- Optimizing Joins
- Reducing the number of jobs and stages
Pre-Requisite:
Set up a quick notebook on EMR.Please follow the below link in case you are not sure how to do it.
Viewing and Setting Apache Spark Configurations
Let’s first have a look at how to set up find and then optimally change the spark Configurations. It's important to understand how to find configurations of spark jobs, understand them and change them to achieve the best performance.
To get configurations
We can access Spark’s current configuration through the Spark UI’s Environment tab:

You can also view only the Spark SQL–specific Spark configs:
// In Scala
spark.sql("SET -v").select("key", "value").show(5, false)
# In Python
spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)
In our notebook, the output looks like this

To set or modify an existing configuration programmatically, first check if the property is modifiable.
spark.conf.isModifiable("<config_name>")
will returntrue
orfalse
Let's try on our notebook:

We can also get a conf via spark.conf.get(“<

To set configurations
The most common way for setting configurations is to specify Spark configurations directly in your Spark application or on the command line when submitting the application with spark-submit
, using the --conf
flag:
spark-submit --conf spark.sql.shuffle.partitions=5 --conf
"spark.executor.memory=2g" --class com.spark.SparkConfig jars/my_spark.jar
Another way is to specify in the application itself. Below is a code sample:
Note: we already have a spark session in notebooks. The below code is more suitable for a standalone spark application
// In Scala
val spark = SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "2g")
.master("local[*]")
.appName("SparkConfig")
.getOrCreate()
Alright so let's get into the configurations we should pay attention to:
Dynamic resource allocation
When we specify compute resources as command-line arguments to spark-submit
, as we did earlier, we cap the limit. This means that if more resources are needed later as tasks queue up in the driver due to a larger than anticipated workload, Spark cannot accommodate or allocate extra resources.
If instead, you use Spark’s dynamic resource allocation configuration, the Spark driver can request more or fewer compute resources as the demand of large workloads flows and ebbs. In scenarios where your workloads are dynamic — that is, they vary in their demand for computing capacity — using dynamic allocation helps to accommodate sudden peaks.
One use case where this can be helpful is streaming, where the data flow volume may be uneven. Another is on-demand data analytics, where you might have a high volume of SQL queries during peak hours
To enable and configure dynamic allocation, we can use settings like the following.
Note: numbers here are arbitrary; the appropriate settings will depend on the nature of your workload and they should be adjusted accordingly.
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
By default spark.dynamicAllocation.enabled
is set to false
Configuring Spark executors’ memory and the shuffle service
The amount of memory available to each executor is controlled by spark.executor.memory
. This is divided into three sections: execution memory, storage memory, and reserved memory.
The default spark memory division is 60% for execution memory and 40% for storage, after allowing for 300 MB for reserved memory, to safeguard against OOM errors.
During map and shuffle operations, Spark writes to and reads from the local disk’s shuffle files, so there is heavy I/O activity. This can result in a bottleneck because the default configurations are suboptimal for large-scale Spark jobs. Knowing what configurations to tweak can mitigate this risk during this phase of a Spark job.

How many Executors
This is a very common but important question.
Spark is very efficient at processing its tasks in parallel. For large-scale workloads, a Spark job will have many stages, and within each stage, there will be many tasks. Spark will at best schedule a thread per task per core, and each task will process a distinct partition.
To optimize resource utilization and maximize parallelism, the ideal is at least as many partitions as there are cores on the executor.
If there are more partitions than there are cores on each executor, all the cores are kept busy. You can think of partitions as atomic units of parallelism: a single thread running on a single core can work on a single partition.
How about Caching
When to cache
Common use cases for caching are scenarios where you will want to access a large data set repeatedly for queries or transformations. Some examples include:
- DataFrames commonly used during iterative machine learning training
- DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines
When not to cache :
- DataFrames that are too big to fit in memory
- An inexpensive transformation on a DataFrame not requiring frequent use, regardless of size
Optimizing joins
Join operations are a common type of transformation in big data analytics in which two data sets, in the form of tables or DataFrames, are merged over a common matching key. Similar to relational databases, the Spark DataFrame and Dataset APIs and Spark SQL offer a series of join transformations: inner joins, outer joins, left joins, right joins, etc. All of these operations trigger a large amount of data movement across Spark executors.
Spark has five distinct join strategies by which it exchanges, moves, sorts, groups, and merges data across executors:
the broadcast hash join (BHJ), shuffle hash join (SHJ), shuffle sort merge join (SMJ), broadcast nested loop join (BNLJ), and shuffle-and-replicated nested loop
Optimizing query with Broadcast Hash Join :
The broadcast hash join is employed when two data sets, one small (fitting in the driver’s and executor’s memory) and another large enough to ideally be spared from movement, need to be joined over certain conditions or columns. Using a Spark broadcast variable, the smaller data set is broadcasted by the driver to all Spark executors and subsequently joined with the larger data set on each executor. This strategy avoids the large exchange.
By default, Spark will use a broadcast join if the smaller data set is less than 10 MB. This configuration is set in spark.sql.autoBroadcastJoinThreshold.
// In Scala
import org.apache.spark.sql.functions.broadcast
val joinedDF = playersDF.join(broadcast(clubsDF), "key1 === key2")
The BHJ is the easiest and fastest join Spark offers since it does not involve any shuffle of the data set; all the data is available locally to the executor after a broadcast. You just have to be sure that you have enough memory both on the Spark driver’s and the executors’ side to hold the smaller data set in memory.
Use this type of join under the following conditions for maximum benefit:
- When each key within the smaller and larger data sets is hashed to the same partition by Spark
- When one data set is much smaller than the other (and within the default config of 10 MB, or more if you have sufficient memory)
Optimizing the shuffle sort merge join
The sort-merge algorithm is an efficient way to merge two large data sets over a common key that is sortable, unique, and can be assigned to or stored in the same partition — that is, two data sets with a common hashable key that end up being on the same partition.
From Spark’s perspective, this means that all rows within each data set with the same key are hashed on the same partition on the same executor. Obviously, this means data has to be co-located or exchanged between executors.
As the name indicates, this join scheme has two phases: a sort phase followed by a merge phase. The sort phase sorts each data set by its desired join key; the merge phase iterates over each key in the row from each data set and merges the rows if the two keys match.
Use this type of join under the following conditions for maximum benefit:
- When each key within two large data sets can be sorted and hashed to the same partition by Spark
- When you want to perform only equi-joins to combine two data sets based on matching sorted keys
Reducing number of jobs and stages
We should try to reduce the number of jobs and stages used by application.
Let's learn with an example: Think about the difference between
spark.read.parquet("abc.parquet").show()
and
spark.read.schema(StructType(List(StructField("id",IntType,false)))).parquet("abc.parquet")
let’s look at spark UI corresponding to both:
First:

Second:

So, you already have got answer by now but just to reiterate:
Always provide schema while creating dataframe as if you dont provide,spark have to run an additional job just to get the schema and it can be very performance deteriorating if the dataset is big
Conclusion
So, we looked around the possible optimizations and reasoning. Let me know your feedback in the comments.
Reference: