avatarKalpan Shah

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

3906

Abstract

KXjKJqdLbMiX"><figcaption>Microsoft Open Data set (Image captured by Author)</figcaption></figure><p id="1e17">We will be using NYC yellow taxi data.</p><h1 id="6624">Install required spark libraries & Start Spark Session</h1><p id="67c1">With our Spark instance, we already have Azure Blob storage and ADLS packages installed. So, we will not require to specify externally. You can check Jars files from /opt/spark/jars location</p><figure id="de62"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*BHFqKsw0LLMmOHCA"><figcaption></figcaption></figure><p id="b997">If you want to use the same package you can use the Maven repo link and pass it in the config file.</p><p id="ea50"><a href="https://mvnrepository.com/artifact/com.microsoft.azure/azure-storage/7.0.1">https://mvnrepository.com/artifact/com.microsoft.azure/azure-storage/7.0.1</a></p><figure id="0f2c"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*CHARUwhqldPsp_Vp"><figcaption>Azure Blob storage Maven Properties (Image captured by Author)</figcaption></figure><p id="c0ba">We will start our Spark session.</p><figure id="b37f"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*hi2nLzM-UuTtMQpn"><figcaption>Starting Spark Session (Image by Author)</figcaption></figure><h1 id="742a">Create a connection with Azure Blob storage from Spark</h1><p id="17da">For connecting to Azure blob storage, we need the details below. Like what is the account name, what is container name, and what is key for connecting that storage and relative path. We have three different datasets</p><ul><li>NYC Yellow taxi data -> very high volume</li><li>Covid public data -> Medium volume</li><li>Public holidays -> low volume</li></ul><p id="0d9e">Let’s start here with NYC yellow taxi data.</p><p id="ced5">The volume of data is too high. 50 billion rows and 50 GB of data. (All in parquet format)</p><div id="dcbc"><pre><span class="hljs-comment"># Azure storage for NYC Yellow taxi</span> blob_account_name = <span class="hljs-string">"azureopendatastorage"</span> blob_container_name = <span class="hljs-string">"nyctlc"</span> blob_relative_path = <span class="hljs-string">"yellow"</span> blob_sas_token = <span class="hljs-string">"r"</span></pre></div><p id="1662">If you want to try Covid Public data you can use the blob details. (NYC taxi is a very big data set so it will take time to load all data)</p><p id="a79b">The volume of data is medium. 125 K rows and around 40 MB of parquet file.</p><div id="62b4"><pre><span class="hljs-comment"># Azure storage for Covid dataset</span> blob_account_name = <span class="hljs-string">"pandemicdatalake"</span> blob_container_name = <span class="hljs-string">"public"</span> blob_relative_path = <span class="hljs-string">"curated/covid-19/bing_covid-19_data/latest/bing_covid-19_data.parquet"</span> blob_sas_token = <span class="hljs-string">r""</span></pre></div><p id="d47a">If you want to try a very small data set, use the holiday list below which is 500KB of data and 70,000 rows.</p><div id="510a"><pre><span class="hljs-comment"># Azure storage for Holiday </span> blob_account_name = <span class="hljs-string">"azureopendatastorage"</span> blob_container_name = <span class="hljs-string">"holidaydatacontainer"</span> blob_relative_path = <span class="hljs-string">"Processed"</span> blob_sas_token = <span class="hljs-string">r""</span></pre></div><p id="8e92">Once we declare variables for all of the details. We will create a connection string and set it in Spark configuration as below. I am going ahead with yellow taxi data but I will suggest trying public holidays. (As volume is low so it will do all the operations faster)</p><div id="38bf"><pre><span class="hljs-comment"># Allow SPARK to read from Blob remotely</span> wasbs_path = <span class="hljs-string">'wasbs://%s@%s.blob.core.windows.net/%s'</span> % (blob_contai

Options

ner_name, blob_account_name, blob_relative_path) spark.conf.<span class="hljs-built_in">set</span>( <span class="hljs-string">'fs.azure.sas.%s.%s.blob.core.windows.net'</span> % (blob_container_name, blob_account_name), blob_sas_token) <span class="hljs-built_in">print</span>(<span class="hljs-string">'Remote blob path: '</span> + wasbs_path)</pre></div><figure id="2254"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*sXiow3O_qv89oKdm"><figcaption>Connecting to Azure Blob storage from Spark (Image by Author)</figcaption></figure><h1 id="f252">Read data from the blob and store it in a data frame</h1><p id="f1fd">The next step is to read data from this location. We have parquet files stored in the location. So, we will use “spark.read.parquet”</p><div id="bdb4"><pre>df = spark.read.parquet(wasbs_path)</pre></div><p id="af9d">We now have data available in the data frame. So, we will print the schema and check the data.</p><div id="d592"><pre>df.printSchema() df.show(n=<span class="hljs-number">2</span>)</pre></div><figure id="c6e9"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*PCHTIQWQ1rQYfXcs"><figcaption>Checking schema and data ( Image by Author)</figcaption></figure><h1 id="e561">Transform data</h1><p id="c177">We will create a HIVE temp view. So, we can write Spark SQL and do Transformation.</p><div id="a90b"><pre>df.createOrReplaceTempView(<span class="hljs-string">'tempSource'</span>)</pre></div><p id="7ea5">We will select the top 10 rows and create a new data frame.</p><div id="5394"><pre>newdf = spark.sql(<span class="hljs-string">'SELECT * FROM tempSource LIMIT 10'</span>)</pre></div><figure id="f56b"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*h1v9fsm3sxrR3kgU"><figcaption>Creating a Hive table and exploring Spark SQL (Image by Author)</figcaption></figure><h1 id="d76a">Write data into a local file (Parquet | JSON)</h1><p id="b509">We have data available in the data frame. Using spark. Write we can write data into different file formats.</p><div id="a563"><pre>newdf.write.<span class="hljs-built_in">format</span>(<span class="hljs-string">"parquet"</span>).option(<span class="hljs-string">"compression"</span>,<span class="hljs-string">"snappy"</span>).save(<span class="hljs-string">"parquetdata"</span>,mode=<span class="hljs-string">'append'</span>) newdf.write.<span class="hljs-built_in">format</span>(<span class="hljs-string">"csv"</span>).option(<span class="hljs-string">"header"</span>,<span class="hljs-string">"true"</span>).save(<span class="hljs-string">"csvdata"</span>,mode=<span class="hljs-string">'append'</span>)</pre></div><p id="568c">Once this is executed, it will create a folder with the names “parquetdata” and “csvdata”.</p><figure id="b8b4"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*WO1F3lRMrX8sQi2z"><figcaption>Parquet files created on the local warehouse (Image by Author)</figcaption></figure><h1 id="d537">Conclusion:</h1><p id="35f9">Once we set the config of Azure blob storage or Azure Data Lake services, using spark. Read we can read files from there and using spark. Write we can write files there.</p><h1 id="8ca4">Video explanation:</h1> <figure id="ffc9"> <div> <div> <img class="ratio" src="http://placehold.it/16x9"> <iframe class="" src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FD64_XFbcZNw%3Ffeature%3Doembed&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DD64_XFbcZNw&amp;image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FD64_XFbcZNw%2Fhqdefault.jpg&amp;key=a19fcc184b9711e1b4764040d3dc5c07&amp;type=text%2Fhtml&amp;schema=youtube" allowfullscreen="" frameborder="0" height="480" width="640"> </div> </div> </figure></iframe></div></div></figure></article></body>

Spark ETL Chapter 3 with Cloud data lakes (Azure Blob | Azure ADLS)

Previous blog/Context:

In an earlier blog, we discussed Spark ETL with NoSQL Databases (MongoDB Database). Please find below blog post for more details.

Introduction:

In this blog, we will discuss Spark ETL with Cloud data lakes and we will be doing Spark ETL with Azure Blob storage. We will use public blob storage and we will understand how to do ETL with Azure Blob storage. In coming blogs, we will be also doing Spark ETL with AWS S3 buckets and Google buckets.

Spark ETL with different Data sources (Image by Author)

Today, we will perform below Spark ETL operations

  • Install required spark libraries
  • Create a connection with Azure Blob storage
  • Read data from the blob and store it in a data frame
  • Transform data
  • write data into a parquet file
  • write data into JSON file

First, clone below the GitHub repo, where we have all the required sample files and the solution

If you don’t have a setup for Spark instance follow the earlier blog for setting up Data Engineering tools in your system. (Data Engineering suite tool will setup Spark, MySQL, PostgreSQL, and MongoDB in your system) In that Spark instance, we already have packages installed for Azure blog storage and Azure Data Lake Services.

Public Azure Blob Storage

We are going to access Public Azure blob storage from Spark. As a part of Azure Open Datasets, there are different types of data publicly available for learning purposes.

Microsoft Open Data set (Image captured by Author)

We will be using NYC yellow taxi data.

Install required spark libraries & Start Spark Session

With our Spark instance, we already have Azure Blob storage and ADLS packages installed. So, we will not require to specify externally. You can check Jars files from /opt/spark/jars location

If you want to use the same package you can use the Maven repo link and pass it in the config file.

https://mvnrepository.com/artifact/com.microsoft.azure/azure-storage/7.0.1

Azure Blob storage Maven Properties (Image captured by Author)

We will start our Spark session.

Starting Spark Session (Image by Author)

Create a connection with Azure Blob storage from Spark

For connecting to Azure blob storage, we need the details below. Like what is the account name, what is container name, and what is key for connecting that storage and relative path. We have three different datasets

  • NYC Yellow taxi data -> very high volume
  • Covid public data -> Medium volume
  • Public holidays -> low volume

Let’s start here with NYC yellow taxi data.

The volume of data is too high. 50 billion rows and 50 GB of data. (All in parquet format)

# Azure storage for NYC Yellow taxi
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

If you want to try Covid Public data you can use the blob details. (NYC taxi is a very big data set so it will take time to load all data)

The volume of data is medium. 125 K rows and around 40 MB of parquet file.

# Azure storage for Covid dataset
blob_account_name = "pandemicdatalake"
blob_container_name = "public"
blob_relative_path = "curated/covid-19/bing_covid-19_data/latest/bing_covid-19_data.parquet"
blob_sas_token = r""

If you want to try a very small data set, use the holiday list below which is 500KB of data and 70,000 rows.

# Azure storage for Holiday 
blob_account_name = "azureopendatastorage"
blob_container_name = "holidaydatacontainer"
blob_relative_path = "Processed"
blob_sas_token = r""

Once we declare variables for all of the details. We will create a connection string and set it in Spark configuration as below. I am going ahead with yellow taxi data but I will suggest trying public holidays. (As volume is low so it will do all the operations faster)

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)
Connecting to Azure Blob storage from Spark (Image by Author)

Read data from the blob and store it in a data frame

The next step is to read data from this location. We have parquet files stored in the location. So, we will use “spark.read.parquet”

df = spark.read.parquet(wasbs_path)

We now have data available in the data frame. So, we will print the schema and check the data.

df.printSchema()
df.show(n=2)
Checking schema and data ( Image by Author)

Transform data

We will create a HIVE temp view. So, we can write Spark SQL and do Transformation.

df.createOrReplaceTempView('tempSource')

We will select the top 10 rows and create a new data frame.

newdf = spark.sql('SELECT * FROM tempSource LIMIT 10')
Creating a Hive table and exploring Spark SQL (Image by Author)

Write data into a local file (Parquet | JSON)

We have data available in the data frame. Using spark. Write we can write data into different file formats.

newdf.write.format("parquet").option("compression","snappy").save("parquetdata",mode='append')
newdf.write.format("csv").option("header","true").save("csvdata",mode='append')

Once this is executed, it will create a folder with the names “parquetdata” and “csvdata”.

Parquet files created on the local warehouse (Image by Author)

Conclusion:

Once we set the config of Azure blob storage or Azure Data Lake services, using spark. Read we can read files from there and using spark. Write we can write files there.

Video explanation:

Spark
Etl
Databricks
Azure Data Lake
Blob Storage
Recommended from ReadMedium