avatarRobert Sanders

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

2381

Abstract

lass="hljs-selector-class">.textFile</span>(<span class="hljs-string">"/path/to/file.txt"</span>)</pre></div><p id="9062">By using the DataFrame API and not reverting to using RDDs you enable Spark to use the Catalyst Optimizer to improve the execution plan of your Spark Job.</p><h1 id="8746">Avoid using Regex’s</h1><p id="a4e2">Java Regex is a great process for parsing data in an expected structure. Unfortunately, the Regex process is generally a slow process and when you have to process millions of rows, a little bit of increase in parsing a single row can cause the entire job to increase in processing time. If at all possible, avoid using Regex’s and try to ensure your data is loaded in a more structured format.</p><h1 id="5aba">Optimize Joins</h1><h2 id="02c8">Put the Largest Dataset on the Left</h2><p id="243e">When you’re joining together two datasets where one is smaller than the other, put the larger dataset on the “Left”:</p><div id="65f9"><pre><span class="hljs-keyword">val</span> joinedDF <span class="hljs-operator">=</span> largerDF.leftJoin(smallerDF, largerDF(<span class="hljs-string">"id"</span>) <span class="hljs-operator">===</span> smallerDF(<span class="hljs-string">"some_id"</span>))</pre></div><p id="efcc">When Spark shuffles data for the join, it keeps the data you specified on the left static on the executors and transfers the data you designed on the right between the executors. If the data that’s on the right, that’s being transferred, is larger, then the serialization and transfer of the data will take longer.</p><h2 id="c54e">Utilize Broadcast Joining for joining Smaller Datasets to Larger Ones</h2><p id="94f9">In many cases, we will be joining smaller data sets (a couple dozen or so rows, maybe a bit more) with larger data sets. In this case, it’s more performant to use a Broadcast Join:</p><div id="bea5"><pre>import org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.spark</span><span class="hljs-selector-class">.sql</span><span class="hljs-selector-class">.functions</span>._</pre></div><div id="a2a4"><pre>val <span class="hljs-keyword">joinedDF </span>= largeDF.<span class="hljs-keyword">join(broadcast(smallDF), </span>largeDF(<span class="hljs-string">"id"</span>) === smallDF(<span class="hljs-string">"some_id"</span>))</pre></div><h1 id="5b71">Use Caching</h1><p id="d89a">If

Options

you find you are constantly using the same DataFrame on multiple queries, it’s recommended to implement Caching or Persistence:</p><div id="7f9a"><pre>val df = spark<span class="hljs-selector-class">.read</span><span class="hljs-selector-class">.textFile</span>(<span class="hljs-string">"/path/to/file.txt"</span>)<span class="hljs-selector-class">.cache</span>()</pre></div><p id="9793">Note: Avoid overusing this. <i>Due to Spark’s caching strategy (in-memory then swap to disk) the cache can end up in slightly slower storage. Also, using that storage space for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the DataFrame.¹</i></p><h1 id="1d67">COMPUTE STATISTICS of Tables Before Processing</h1><p id="2841">Before querying a series of tables, it can be helpful to tell spark to Compute the Statistics of those tables so that the Catalyst Optimizer can come up with a better plan on how to process the tables.</p><div id="7bf1"><pre>spark.<span class="hljs-keyword">sql</span>("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS")</pre></div><p id="96fe">In some cases, Spark doesn’t get everything it needs from just the above broad COMPUTE STATISTICS call. It also helps to tell Spark to check specific columns so the Catalyst Optimizer can better check those columns. It’s recommended to COMPUTE STATISTICS for any columns that are involved in filtering and joining.</p><div id="c607"><pre>spark<span class="hljs-selector-class">.sql</span>("ANALYZE <span class="hljs-selector-tag">TABLE</span> dbName<span class="hljs-selector-class">.tableName</span> COMPUTE STATISTICS FOR <span class="hljs-attribute">COLUMNS</span> joinColumn, filterColumn")</pre></div><h1 id="d8f8">Set the spark.sql.shuffle.partitions Configuration Parameter</h1><p id="a604">The default value for this is 200 which can be too high for some jobs. Set this configuration to the number of cores you have available across all your executors.</p><div id="24fc"><pre>spark.conf.<span class="hljs-built_in">set</span>(<span class="hljs-string">"spark.sql.shuffle.partitions"</span>, 10)</pre></div><p id="bddf">References</p><p id="3ef3">1 — <a href="https://readmedium.com/spark-performance-tuning-from-the-trenches-7cbde521cf60">https://readmedium.com/spark-performance-tuning-from-the-trenches-7cbde521cf60</a></p></article></body>

Improving your Apache Spark Application Performance

Simple Tips and Tricks to Improve the Performance of your Spark Applications

Pixabay — Abstract Abstraction Acceleration — link

Apache Spark has quickly become one of the most heavily used processing engines in the Big Data space since it became a Top-Level Apache Project in February of 2014. Not only can it run in a variety of environments (locally, Standalone Spark Cluster, Apache Mesos, YARN, etc) but it can also provide a number of libraries that can help you solve just about any problem on Hadoop. This includes running SQL Queries, Streaming, and Machine Learning to name a few. All running on an optimized execution engine.

We at Clairvoyant have built many Data Pipelines with Apache Spark, including Batch and Streaming over the years. You can find out more information here. After having built so many pipelines we’ve found some simple ways to improve the performance of Spark Applications. Here are a few tips and tricks that we’ve found:

Use DataFrame’s instead of RDDs

Instead of using the RDD API

val rdd = sc.textFile("/path/to/file.txt")

Use the DataFrames API

val df = spark.read.textFile("/path/to/file.txt")

By using the DataFrame API and not reverting to using RDDs you enable Spark to use the Catalyst Optimizer to improve the execution plan of your Spark Job.

Avoid using Regex’s

Java Regex is a great process for parsing data in an expected structure. Unfortunately, the Regex process is generally a slow process and when you have to process millions of rows, a little bit of increase in parsing a single row can cause the entire job to increase in processing time. If at all possible, avoid using Regex’s and try to ensure your data is loaded in a more structured format.

Optimize Joins

Put the Largest Dataset on the Left

When you’re joining together two datasets where one is smaller than the other, put the larger dataset on the “Left”:

val joinedDF = largerDF.leftJoin(smallerDF, largerDF("id") === smallerDF("some_id"))

When Spark shuffles data for the join, it keeps the data you specified on the left static on the executors and transfers the data you designed on the right between the executors. If the data that’s on the right, that’s being transferred, is larger, then the serialization and transfer of the data will take longer.

Utilize Broadcast Joining for joining Smaller Datasets to Larger Ones

In many cases, we will be joining smaller data sets (a couple dozen or so rows, maybe a bit more) with larger data sets. In this case, it’s more performant to use a Broadcast Join:

import org.apache.spark.sql.functions._
val joinedDF = largeDF.join(broadcast(smallDF), largeDF("id") === smallDF("some_id"))

Use Caching

If you find you are constantly using the same DataFrame on multiple queries, it’s recommended to implement Caching or Persistence:

val df = spark.read.textFile("/path/to/file.txt").cache()

Note: Avoid overusing this. Due to Spark’s caching strategy (in-memory then swap to disk) the cache can end up in slightly slower storage. Also, using that storage space for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the DataFrame.¹

COMPUTE STATISTICS of Tables Before Processing

Before querying a series of tables, it can be helpful to tell spark to Compute the Statistics of those tables so that the Catalyst Optimizer can come up with a better plan on how to process the tables.

spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS")

In some cases, Spark doesn’t get everything it needs from just the above broad COMPUTE STATISTICS call. It also helps to tell Spark to check specific columns so the Catalyst Optimizer can better check those columns. It’s recommended to COMPUTE STATISTICS for any columns that are involved in filtering and joining.

spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS FOR COLUMNS joinColumn, filterColumn")

Set the spark.sql.shuffle.partitions Configuration Parameter

The default value for this is 200 which can be too high for some jobs. Set this configuration to the number of cores you have available across all your executors.

spark.conf.set("spark.sql.shuffle.partitions", 10)

References

1 — https://readmedium.com/spark-performance-tuning-from-the-trenches-7cbde521cf60

Apache
Spark
Performance
Data Engineering
Machine Learning
Recommended from ReadMedium