avatarGeorgia Deaconu

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

2494

Abstract

to optimize the amount of data that needs to be processed.</p><p id="ba21">Starting from the optimized logical plan, the Catalyst optimizer will generate several <b>physical plans</b> that describe how the various steps can be actually executed on the cluster. The best physical plan is selected using cost-based optimization that takes into consideration the execution time and the resources consumption. The whole process is a little bit more complex than what I have summarized here, so feel free to check out <a href="https://readmedium.com/sparks-logical-and-physical-plans-when-why-how-and-beyond-8cd1947b605a">this article</a> for a more comprehensive view of the process and some examples on how to make good usage of the <a href="https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html#:~:text=The%20EXPLAIN%20statement%20is%20used,about%20a%20physical%20plan%20only."><i>explain </i></a>function on your RDD.</p><p id="bad9">The selected physical plan contains the definition of the stages and the associated tasks, commonly known as the <b>DAG </b>(which stands for Direct Acyclic Graph). You can use the <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.toDebugString.html">toDebugString </a>function on your RDD to display the associated DAG or you can visualize it directly in the Spark History UI:</p><figure id="2c08"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*WJyyYEa35J7B6cA32NYkuA.png"><figcaption>DAG visualization in Spark History UI (<a href="https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html">image source</a>)</figcaption></figure><p id="9d90">Explicitly caching your data in the code can prevent Spark from optimizing the execution. For instance, caching data before a filtering operation will eliminate the possibility of predicate pushdown. Things are obvious in this simple example but more complex applications require more attention.</p><h1 id="1e0a">3. Stage planning</h1><p id="7a73">During the execution planning phase Spark decides how the code will be split into stages, and the order in which they can be executed. To do so it starts from the lineage graph, working its way back from the action that triggers the computation. Whenever a wide transformation is encountered, which usually requires the data to be shuffled between the executors, it introduces a stage boundary.</p><p id="ac38">A stage will end with a shuffle write step,

Options

which will cause some shuffle files to be written to disk, while the next stage will start with a shuffle read step. This is similar to what is happening when you explicitly cache the data.</p><p id="3341">When looking at the details of the execution of a Spark job from the Spark History server, we can usually understand the order in which the various stages have been executed just by looking at the size of the corresponding Shuffle Read and Shuffle Write.</p><figure id="561f"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*3SdSOfB_CvWF0YEHgVp21Q.png"><figcaption>Example of Shuffle Read/Write in job details from Spark’s History server (Image by the Author)</figcaption></figure><h1 id="92e0">4. Partitioning</h1><p id="2487">Spark splits the data across the cluster using partitions. The data in a partition exists on a single node on the cluster.</p><p id="27eb">The <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.repartition.html?highlight=repartition#pyspark.RDD.repartition"><i>repartition</i> </a>function can be used to increase or reduce the number of partitions but it is a wide transformation which involves shuffling all the data. The <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.coalesce.html?highlight=coalesce#pyspark.RDD.coalesce"><i>coalesce </i></a>function can be used to reduce the number of partitions and it has the advantage of being a narrow transformation. However, it does not guarantee that the data is evenly balanced among the new partitions.</p><p id="b231">Fewer partitions allow work to be done in larger chunks with reduced shuffling overhead but there are physical limits to how much data a shuffle block can contain depending on the cluster’s cores configuration.</p><p id="2c27">For a more comprehensive view of the partitioning options in Apache Spark you can check out this <a href="https://readmedium.com/partitioning-in-apache-spark-8134ad840b0">article</a>.</p><p id="56b0">I hope this post provided an overview of some of the main concepts in Apache Spark. For an example of using PySpark to process some data, you can check out this <a href="https://towardsdatascience.com/a-pyspark-example-for-dealing-with-larger-than-memory-datasets-70dbc82b0e98">article</a>. For an exemple of how to set up the Spark History Server and look at your DAG and the other metrics check out <a href="https://readmedium.com/4a539e13343a">this post</a>.</p></article></body>

Understanding core concepts in Apache Spark

Apache Spark is one of the most popular frameworks for big data processing nowadays. Its success is widely attributed to the 100x reduction in processing time with respect to MapReduce. This article summarizes some core concepts from Spark architecture that will allow you to fully take advantage of Spark’s capabilities.

1. Actions and transformations

The fundamental data structure of Apache spark is the RDD, which stands for Resilient Distributed Dataset. It is an immutable collection of data that is partitioned across the cluster’s nodes. Datasets and DataFrames APIs have been introduced since Spark 1.6 as abstractions on top of the RDD to handle more structured data, but the following concepts still apply.

A transformation is an operation that takes as input an RDD and produces another RDD as output (hence the immutable in the definition). Examples of transformations are map, select, filter, join etc.

Actions are actual computations that return a derived value, such as count or aggregate operations.

Transformations in Spark are lazy, actions are what actually trigger the execution. The list of actions and transformations performed on an RDD is stored internally in the lineage graph (also known as logical execution plan). This is part of what allows Spark to be robust to hardware failures, by reprocessing just a subset of the data in case of executor loss.

Spark distinguishes between narrow transformations and wide transformations. Narrow transformations are transformations for which the necessary data resides on one partition (e.g. select, filter, cast, etc.). For wide transformations, the data may come from several other partitions of a parent RDD (groupBy, sort, join, distinct, etc.)

2. Optimized Execution Planning

Starting from the code you have written, Spark’s Catalyst optimizer will perform a series of operations in order to compute and select the best Physical Execution Plan. It starts by analyzing the logical execution plan in order to identify possible optimizations, such as reordering or grouping filters together. A classic example of optimization is predicate pushdown, where an existing filtering operation is moved ahead of all other transformations in order to optimize the amount of data that needs to be processed.

Starting from the optimized logical plan, the Catalyst optimizer will generate several physical plans that describe how the various steps can be actually executed on the cluster. The best physical plan is selected using cost-based optimization that takes into consideration the execution time and the resources consumption. The whole process is a little bit more complex than what I have summarized here, so feel free to check out this article for a more comprehensive view of the process and some examples on how to make good usage of the explain function on your RDD.

The selected physical plan contains the definition of the stages and the associated tasks, commonly known as the DAG (which stands for Direct Acyclic Graph). You can use the toDebugString function on your RDD to display the associated DAG or you can visualize it directly in the Spark History UI:

DAG visualization in Spark History UI (image source)

Explicitly caching your data in the code can prevent Spark from optimizing the execution. For instance, caching data before a filtering operation will eliminate the possibility of predicate pushdown. Things are obvious in this simple example but more complex applications require more attention.

3. Stage planning

During the execution planning phase Spark decides how the code will be split into stages, and the order in which they can be executed. To do so it starts from the lineage graph, working its way back from the action that triggers the computation. Whenever a wide transformation is encountered, which usually requires the data to be shuffled between the executors, it introduces a stage boundary.

A stage will end with a shuffle write step, which will cause some shuffle files to be written to disk, while the next stage will start with a shuffle read step. This is similar to what is happening when you explicitly cache the data.

When looking at the details of the execution of a Spark job from the Spark History server, we can usually understand the order in which the various stages have been executed just by looking at the size of the corresponding Shuffle Read and Shuffle Write.

Example of Shuffle Read/Write in job details from Spark’s History server (Image by the Author)

4. Partitioning

Spark splits the data across the cluster using partitions. The data in a partition exists on a single node on the cluster.

The repartition function can be used to increase or reduce the number of partitions but it is a wide transformation which involves shuffling all the data. The coalesce function can be used to reduce the number of partitions and it has the advantage of being a narrow transformation. However, it does not guarantee that the data is evenly balanced among the new partitions.

Fewer partitions allow work to be done in larger chunks with reduced shuffling overhead but there are physical limits to how much data a shuffle block can contain depending on the cluster’s cores configuration.

For a more comprehensive view of the partitioning options in Apache Spark you can check out this article.

I hope this post provided an overview of some of the main concepts in Apache Spark. For an example of using PySpark to process some data, you can check out this article. For an exemple of how to set up the Spark History Server and look at your DAG and the other metrics check out this post.

Big Data
Spark
Data Engineering
Recommended from ReadMedium