avatarAbi

Summary

This article provides a comprehensive guide to optimizing AWS EMR Spark jobs through configuration tuning, dynamic resource allocation, caching strategies, join optimization, and reducing the number of jobs and stages.

Abstract

The write-up delves into the performance tuning of Apache Spark jobs running on AWS EMR, offering practical examples and best practices. It covers key areas for optimization such as setting correct Apache Spark configurations, determining the right number of executors, leveraging caching, optimizing various types of joins, and minimizing the number of jobs and stages to improve overall efficiency. The article emphasizes the importance of understanding Spark's dynamic resource allocation to handle varying workloads effectively and suggests using broadcast hash joins and shuffle sort merge joins for optimal data processing. It also advises on providing a schema when creating DataFrames to avoid unnecessary jobs for schema inference, which can significantly impact performance.

Opinions

  • The author advocates for the careful adjustment of Spark configurations to achieve the best performance, highlighting the significance of understanding and modifying these settings.
  • The article suggests that dynamic resource allocation is crucial for workloads with fluctuating demands, allowing Spark to adjust the number of executors dynamically.
  • Caching is recommended for data sets that are frequently accessed, but it should be avoided for data sets that are too large to fit in memory or for inexpensive transformations that are not used often.
  • Optimizing join operations is presented as a key strategy for improving Spark job performance, with a preference for broadcast hash joins for smaller data sets and shuffle sort merge joins for larger, sortable data sets.
  • The author emphasizes the importance of reducing the number of jobs and stages by, for example, providing a schema when reading data to prevent Spark from running an additional job just to infer the schema.
  • The author believes that the provided hands-on examples and best practices can significantly enhance the performance of Spark jobs on AWS EMR.

Performance Tuning of AWS EMR Spark Job With Hands-on Demo

Often we come across cases when we need to optimize our spark job, this writeup drills down in the details with hands-on examples

Image by author

What we will learn:

Spark optimization via :

  1. Identifying and setting correct Apache Spark Configurations
  2. Executor Numbers
  3. Caching
  4. Optimizing Joins
  5. Reducing the number of jobs and stages

Pre-Requisite:

Set up a quick notebook on EMR.Please follow the below link in case you are not sure how to do it.

Viewing and Setting Apache Spark Configurations

Let’s first have a look at how to set up find and then optimally change the spark Configurations. It's important to understand how to find configurations of spark jobs, understand them and change them to achieve the best performance.

To get configurations

We can access Spark’s current configuration through the Spark UI’s Environment tab:

You can also view only the Spark SQL–specific Spark configs:

// In Scala
spark.sql("SET -v").select("key", "value").show(5, false)
# In Python
spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)

In our notebook, the output looks like this

To set or modify an existing configuration programmatically, first check if the property is modifiable. spark.conf.isModifiable("<config_name>") will return true or false

Let's try on our notebook:

We can also get a conf via spark.conf.get(“<>”).Let's find shuffle partitions on our notebook

To set configurations

The most common way for setting configurations is to specify Spark configurations directly in your Spark application or on the command line when submitting the application with spark-submit, using the --conf flag:

spark-submit --conf spark.sql.shuffle.partitions=5 --conf
"spark.executor.memory=2g" --class com.spark.SparkConfig jars/my_spark.jar

Another way is to specify in the application itself. Below is a code sample:

Note: we already have a spark session in notebooks. The below code is more suitable for a standalone spark application

// In Scala

 val spark = SparkSession.builder
   .config("spark.sql.shuffle.partitions", 5)
   .config("spark.executor.memory", "2g")
   .master("local[*]")
   .appName("SparkConfig")
   .getOrCreate()

Alright so let's get into the configurations we should pay attention to:

Dynamic resource allocation

When we specify compute resources as command-line arguments to spark-submit, as we did earlier, we cap the limit. This means that if more resources are needed later as tasks queue up in the driver due to a larger than anticipated workload, Spark cannot accommodate or allocate extra resources.

If instead, you use Spark’s dynamic resource allocation configuration, the Spark driver can request more or fewer compute resources as the demand of large workloads flows and ebbs. In scenarios where your workloads are dynamic — that is, they vary in their demand for computing capacity — using dynamic allocation helps to accommodate sudden peaks.

One use case where this can be helpful is streaming, where the data flow volume may be uneven. Another is on-demand data analytics, where you might have a high volume of SQL queries during peak hours

To enable and configure dynamic allocation, we can use settings like the following.

Note: numbers here are arbitrary; the appropriate settings will depend on the nature of your workload and they should be adjusted accordingly.

spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min

By default spark.dynamicAllocation.enabled is set to false

Configuring Spark executors’ memory and the shuffle service

The amount of memory available to each executor is controlled by spark.executor.memory. This is divided into three sections: execution memory, storage memory, and reserved memory.

The default spark memory division is 60% for execution memory and 40% for storage, after allowing for 300 MB for reserved memory, to safeguard against OOM errors.

During map and shuffle operations, Spark writes to and reads from the local disk’s shuffle files, so there is heavy I/O activity. This can result in a bottleneck because the default configurations are suboptimal for large-scale Spark jobs. Knowing what configurations to tweak can mitigate this risk during this phase of a Spark job.

Recommended settings for Apache spark

Spreadsheet Link

How many Executors

This is a very common but important question.

Spark is very efficient at processing its tasks in parallel. For large-scale workloads, a Spark job will have many stages, and within each stage, there will be many tasks. Spark will at best schedule a thread per task per core, and each task will process a distinct partition.

To optimize resource utilization and maximize parallelism, the ideal is at least as many partitions as there are cores on the executor.

If there are more partitions than there are cores on each executor, all the cores are kept busy. You can think of partitions as atomic units of parallelism: a single thread running on a single core can work on a single partition.

How about Caching

When to cache

Common use cases for caching are scenarios where you will want to access a large data set repeatedly for queries or transformations. Some examples include:

  • DataFrames commonly used during iterative machine learning training
  • DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines

When not to cache :

  • DataFrames that are too big to fit in memory
  • An inexpensive transformation on a DataFrame not requiring frequent use, regardless of size

Optimizing joins

Join operations are a common type of transformation in big data analytics in which two data sets, in the form of tables or DataFrames, are merged over a common matching key. Similar to relational databases, the Spark DataFrame and Dataset APIs and Spark SQL offer a series of join transformations: inner joins, outer joins, left joins, right joins, etc. All of these operations trigger a large amount of data movement across Spark executors.

Spark has five distinct join strategies by which it exchanges, moves, sorts, groups, and merges data across executors:

the broadcast hash join (BHJ), shuffle hash join (SHJ), shuffle sort merge join (SMJ), broadcast nested loop join (BNLJ), and shuffle-and-replicated nested loop

Optimizing query with Broadcast Hash Join :

The broadcast hash join is employed when two data sets, one small (fitting in the driver’s and executor’s memory) and another large enough to ideally be spared from movement, need to be joined over certain conditions or columns. Using a Spark broadcast variable, the smaller data set is broadcasted by the driver to all Spark executors and subsequently joined with the larger data set on each executor. This strategy avoids the large exchange.

By default, Spark will use a broadcast join if the smaller data set is less than 10 MB. This configuration is set in spark.sql.autoBroadcastJoinThreshold.

// In Scala 
import org.apache.spark.sql.functions.broadcast
val joinedDF = playersDF.join(broadcast(clubsDF), "key1 === key2")

The BHJ is the easiest and fastest join Spark offers since it does not involve any shuffle of the data set; all the data is available locally to the executor after a broadcast. You just have to be sure that you have enough memory both on the Spark driver’s and the executors’ side to hold the smaller data set in memory.

Use this type of join under the following conditions for maximum benefit:

  • When each key within the smaller and larger data sets is hashed to the same partition by Spark
  • When one data set is much smaller than the other (and within the default config of 10 MB, or more if you have sufficient memory)

Optimizing the shuffle sort merge join

The sort-merge algorithm is an efficient way to merge two large data sets over a common key that is sortable, unique, and can be assigned to or stored in the same partition — that is, two data sets with a common hashable key that end up being on the same partition.

From Spark’s perspective, this means that all rows within each data set with the same key are hashed on the same partition on the same executor. Obviously, this means data has to be co-located or exchanged between executors.

As the name indicates, this join scheme has two phases: a sort phase followed by a merge phase. The sort phase sorts each data set by its desired join key; the merge phase iterates over each key in the row from each data set and merges the rows if the two keys match.

Use this type of join under the following conditions for maximum benefit:

  • When each key within two large data sets can be sorted and hashed to the same partition by Spark
  • When you want to perform only equi-joins to combine two data sets based on matching sorted keys

Reducing number of jobs and stages

We should try to reduce the number of jobs and stages used by application.

Let's learn with an example: Think about the difference between

spark.read.parquet("abc.parquet").show()

and

spark.read.schema(StructType(List(StructField("id",IntType,false)))).parquet("abc.parquet")

let’s look at spark UI corresponding to both:

First:

Second:

So, you already have got answer by now but just to reiterate:

Always provide schema while creating dataframe as if you dont provide,spark have to run an additional job just to get the schema and it can be very performance deteriorating if the dataset is big

Conclusion

So, we looked around the possible optimizations and reasoning. Let me know your feedback in the comments.

Reference:

AWS
Data Engineering
Data Science
Programming
Software Engineering
Recommended from ReadMedium