avatarKalpan Shah

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

3753

Abstract

'</span> : <span class="hljs-string">'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1'</span></pre></div><h1 id="ca59">MongoDB Setup and load data for Spark ETL</h1><p id="80aa">As part of the Data Engineering suite, we have set up MongoDB also. From Spark, we will read and write data to MongoDB. We can use files from GitHub and Import them into MongoDB so that we have the collection ready in MongoDB which we can read from Spark.</p><p id="f996">Open Mongo Compass and connect the local MongoDB instance, click on create a database</p><figure id="4bf5"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*-q2xHUaIWKctRh3D"><figcaption>Loading data to MongoDB collection (Image by Author)</figcaption></figure><p id="c107">Pass database name as ‘dataengineeing’ and collection name as ‘employee’. Open the collection and click on import, select the employee.csv file from the GitHub repo.</p><figure id="4c43"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*jJfcFqOKCdC9W1eR"><figcaption>Selecting file and format (Image by Author)</figcaption></figure><p id="62c9">Check all the columns and click on import</p><figure id="5240"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*CmeadrVExKcs9BXp"><figcaption>Checking data and importing to collection (Image by Author)</figcaption></figure><p id="46ac">Once you click on import, we will have data available in MongoDB.</p><figure id="9aad"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*4_gRTO61rbIW1ecc"><figcaption>Select query on MongoDB collection (Image by Author)</figcaption></figure><h1 id="f57d">Install required spark libraries</h1><p id="ccd4">As we discussed earlier, with starting the Spark session, we will also specify the required MongoDB package. So, it will download and install MongoDB packages after starting the Spark session.</p><figure id="a980"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*jHy2i4WCV4_wvjgH"><figcaption>Starting a spark session with MongoDB packages (Image by Author)</figcaption></figure><p id="5118">So, we now have all the required packages that are also available with Spark session.</p><h1 id="38be">Create a connection with MongoDB & Read the data</h1><p id="4908">We will use the code below for creating a connection with MongoDB and read data from the collection.</p><div id="aae9"><pre>mongodf = spark.<span class="hljs-built_in">read</span>.<span class="hljs-built_in">format</span>(<span class="hljs-string">"mongo"</span>)
.option(<span class="hljs-string">"uri"</span>, <span class="hljs-string">"mongodb://root:[email protected]:27017/"</span>)
.option(<span class="hljs-string">"database"</span>, <span class="hljs-string">"dataengineering"</span>)
.option(<span class="hljs-string">"collection"</span>, <span class="hljs-string">"employee"</span>)
.<span class="hljs-built_in">load</span>()</pre></div><p id="55bb">Here in “uri”, we have passed the connection string. Which is the same as we passed to login from Mongo Compass. The second step is to print the schema and check the data in the data frame.</p><div id="2021"><pre>mongodf<span class="hljs-selector-class">.printSchema</span>() mongodf<span class="hljs-selector-class">.show</span>(n=<span class="hljs-number">10</span>)</pre></div><figure id="b8dd"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*VlmfaJXg6RsbmkrV"><figcaption>Checking schema and checking data from MongoDB (Image by Author)</figcaption></figure><h1 id="5e9a">Transform data</h1><p id="1ca5">First, we will create a HIVE view (or table) so we can write Spark SQL.</p><div id="8275"><pre>mongodf.createOrReplaceTempView(<span class="hljs-string">"tempMongo"</span>)</pre><

Options

/div><p id="f829">Also, check data by writing a select query</p><div id="c504"><pre>sqlContext<span class="hljs-selector-class">.sql</span>("SELECT * FROM tempMongo")<span class="hljs-selector-class">.show</span>(n=<span class="hljs-number">5</span>)</pre></div><figure id="104f"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*5B6hpY35r4IEqQo3"><figcaption>Exploring Spark SQL (Image by Author)</figcaption></figure><p id="4d42">We will filter employees who is having a salary of more than 50000 and we only need their first name and salary. So, our query will be as below.</p><div id="4c81"><pre><span class="hljs-attr">newdf</span> = sqlContext.sql(<span class="hljs-string">"SELECT first_name,salary FROM tempMongo WHERE salary > 50000"</span>)</pre></div><figure id="333b"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*cemcQEKUcMqFKPrZ"><figcaption>Exploring Spark SQL (Image by Author)</figcaption></figure><h1 id="33ba">Write data to MongoDB</h1><p id="c210">Now, we will load the same (employees with more than 50K) data into MongoDB. For that, we will use the code below.</p><div id="22b8"><pre>newdf.write.format(<span class="hljs-string">"mongo"</span>)
.<span class="hljs-keyword">option</span>(<span class="hljs-string">"uri"</span>, <span class="hljs-string">"mongodb://root:[email protected]:27017/"</span>)
.<span class="hljs-keyword">option</span>(<span class="hljs-string">"database"</span>, <span class="hljs-string">"dataengineering"</span>)
.<span class="hljs-keyword">option</span>(<span class="hljs-string">"collection"</span>, <span class="hljs-string">"employee1"</span>)
.mode(<span class="hljs-string">"append"</span>).save()</pre></div><p id="cbd4">This will check if the ‘employee1’ collection is there in MongoDB or not. If it is there, it will just load data. If not, it will create collection and load data.</p><figure id="d970"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*0Ryj6PqzVQXJHbIK"><figcaption>Checking data in MongoDB loaded from Spark data frame (Image by Author)</figcaption></figure><p id="0c85">Same way, we can use the same data frame and load data into MySQL or PostgreSQL or into any file format (CSV, Parquet, JSON, ORC, AVRO, etc.) Please check the earlier blog on how to load data into SQL or files if you don’t know.</p><h1 id="dd9c">Conclusion:</h1><p id="ae02">Here we learned</p><ul><li>How to create a connection with MongoDB from Spark</li><li>How to read data from collection from Spark -> “spark.read”</li><li>How to load data into the collection from Spark -> “dataframe.write”</li></ul><h1 id="16dc">Video explanation:</h1> <figure id="c098"> <div> <div> <img class="ratio" src="http://placehold.it/16x9"> <iframe class="" src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FvPZV_GF0klE%3Ffeature%3Doembed&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DvPZV_GF0klE&amp;image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FvPZV_GF0klE%2Fhqdefault.jpg&amp;key=a19fcc184b9711e1b4764040d3dc5c07&amp;type=text%2Fhtml&amp;schema=youtube" allowfullscreen="" frameborder="0" height="480" width="640"> </div> </div> </figure></iframe></div></div></figure><figure id="659b"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*uGCWAIQ3xyD0a1Hr.png"><figcaption></figcaption></figure><h2 id="8367">👋 If you find this helpful, please click the clap 👏 button below a few times to show your support for the author 👇</h2><h2 id="bd0d">🚀Join FAUN Developer Community & Get Similar Stories in your Inbox Each Week</h2></article></body>

Spark ETL Chapter 2 with NoSQL Database (MongoDB | Cassandra)

Previous blog/Context:

In an earlier blog, we discussed Spark ETL with SQL Databases (MySQL and PostgreSQL Database). Please find below blog post for more details.

Introduction:

In this blog, we will discuss Spark ETL with a NoSQL database, and we are considering MongoDB we will do all the Spark ETL with the MongoDB database. All other NoSQL databases like Cassandra, Azure Cosmos DB, Amazon DynamoDB, and others, all follow the same pattern for doing Spark ETL as MongoDB. (Only difference is we need to use a specific spark package for each database)

Spark ETL with data sources (Image by Author)

Today, we will perform below Spark ETL operations

  1. Install required spark libraries
  2. Create a connection with MongoDB
  3. Read data from MongoDB
  4. Transform data
  5. write data into MongoDB

First, clone below the GitHub repo, where we have all the required sample files and the solution

If you don’t have a setup for Spark instance and MongoDB in your system follow the earlier blog for setting up Data Engineering tools in your system. (Data Engineering suite will setup Spark, MySQL, PostgreSQL, and MongoDB in your system)

Spark ETL with NoSQL Database

First, open Jupyter Notebook and copy all the content of Chapter 2 from GitHub to there.

As you know, we don’t have the MongoDB Spark package available with our existing Spark setup. So, we also need to download and install the MongoDB Spark package.

We can check all the available versions for the MongoDB spark package from the below Maven page

https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector

Maven packages info for MongoDB (Image by Author)

so, if we join “groupid:artifactid:version”

It will be as below

'spark.jars.packages' : 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1'

MongoDB Setup and load data for Spark ETL

As part of the Data Engineering suite, we have set up MongoDB also. From Spark, we will read and write data to MongoDB. We can use files from GitHub and Import them into MongoDB so that we have the collection ready in MongoDB which we can read from Spark.

Open Mongo Compass and connect the local MongoDB instance, click on create a database

Loading data to MongoDB collection (Image by Author)

Pass database name as ‘dataengineeing’ and collection name as ‘employee’. Open the collection and click on import, select the employee.csv file from the GitHub repo.

Selecting file and format (Image by Author)

Check all the columns and click on import

Checking data and importing to collection (Image by Author)

Once you click on import, we will have data available in MongoDB.

Select query on MongoDB collection (Image by Author)

Install required spark libraries

As we discussed earlier, with starting the Spark session, we will also specify the required MongoDB package. So, it will download and install MongoDB packages after starting the Spark session.

Starting a spark session with MongoDB packages (Image by Author)

So, we now have all the required packages that are also available with Spark session.

Create a connection with MongoDB & Read the data

We will use the code below for creating a connection with MongoDB and read data from the collection.

mongodf = spark.read.format("mongo") \
    .option("uri", "mongodb://root:[email protected]:27017/") \
    .option("database", "dataengineering") \
    .option("collection", "employee") \
    .load()

Here in “uri”, we have passed the connection string. Which is the same as we passed to login from Mongo Compass. The second step is to print the schema and check the data in the data frame.

mongodf.printSchema()
mongodf.show(n=10)
Checking schema and checking data from MongoDB (Image by Author)

Transform data

First, we will create a HIVE view (or table) so we can write Spark SQL.

mongodf.createOrReplaceTempView("tempMongo")

Also, check data by writing a select query

sqlContext.sql("SELECT * FROM tempMongo").show(n=5)
Exploring Spark SQL (Image by Author)

We will filter employees who is having a salary of more than 50000 and we only need their first name and salary. So, our query will be as below.

newdf = sqlContext.sql("SELECT first_name,salary FROM tempMongo WHERE salary > 50000")
Exploring Spark SQL (Image by Author)

Write data to MongoDB

Now, we will load the same (employees with more than 50K) data into MongoDB. For that, we will use the code below.

newdf.write.format("mongo") \
    .option("uri", "mongodb://root:[email protected]:27017/") \
    .option("database", "dataengineering") \
    .option("collection", "employee1") \
   .mode("append").save()

This will check if the ‘employee1’ collection is there in MongoDB or not. If it is there, it will just load data. If not, it will create collection and load data.

Checking data in MongoDB loaded from Spark data frame (Image by Author)

Same way, we can use the same data frame and load data into MySQL or PostgreSQL or into any file format (CSV, Parquet, JSON, ORC, AVRO, etc.) Please check the earlier blog on how to load data into SQL or files if you don’t know.

Conclusion:

Here we learned

  • How to create a connection with MongoDB from Spark
  • How to read data from collection from Spark -> “spark.read”
  • How to load data into the collection from Spark -> “dataframe.write”

Video explanation:

👋 If you find this helpful, please click the clap 👏 button below a few times to show your support for the author 👇

🚀Join FAUN Developer Community & Get Similar Stories in your Inbox Each Week

Spark
Etl
Pyspark
Databricks
NoSQL
Recommended from ReadMedium