Spark ETL Chapter 2 with NoSQL Database (MongoDB | Cassandra)
Previous blog/Context:
In an earlier blog, we discussed Spark ETL with SQL Databases (MySQL and PostgreSQL Database). Please find below blog post for more details.
Introduction:
In this blog, we will discuss Spark ETL with a NoSQL database, and we are considering MongoDB we will do all the Spark ETL with the MongoDB database. All other NoSQL databases like Cassandra, Azure Cosmos DB, Amazon DynamoDB, and others, all follow the same pattern for doing Spark ETL as MongoDB. (Only difference is we need to use a specific spark package for each database)
Today, we will perform below Spark ETL operations
- Install required spark libraries
- Create a connection with MongoDB
- Read data from MongoDB
- Transform data
- write data into MongoDB
First, clone below the GitHub repo, where we have all the required sample files and the solution
If you don’t have a setup for Spark instance and MongoDB in your system follow the earlier blog for setting up Data Engineering tools in your system. (Data Engineering suite will setup Spark, MySQL, PostgreSQL, and MongoDB in your system)
Spark ETL with NoSQL Database
First, open Jupyter Notebook and copy all the content of Chapter 2 from GitHub to there.
As you know, we don’t have the MongoDB Spark package available with our existing Spark setup. So, we also need to download and install the MongoDB Spark package.
We can check all the available versions for the MongoDB spark package from the below Maven page
https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector
so, if we join “groupid:artifactid:version”
It will be as below
'spark.jars.packages' : 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1'MongoDB Setup and load data for Spark ETL
As part of the Data Engineering suite, we have set up MongoDB also. From Spark, we will read and write data to MongoDB. We can use files from GitHub and Import them into MongoDB so that we have the collection ready in MongoDB which we can read from Spark.
Open Mongo Compass and connect the local MongoDB instance, click on create a database
Pass database name as ‘dataengineeing’ and collection name as ‘employee’. Open the collection and click on import, select the employee.csv file from the GitHub repo.
Check all the columns and click on import
Once you click on import, we will have data available in MongoDB.
Install required spark libraries
As we discussed earlier, with starting the Spark session, we will also specify the required MongoDB package. So, it will download and install MongoDB packages after starting the Spark session.
So, we now have all the required packages that are also available with Spark session.
Create a connection with MongoDB & Read the data
We will use the code below for creating a connection with MongoDB and read data from the collection.
mongodf = spark.read.format("mongo") \
.option("uri", "mongodb://root:[email protected]:27017/") \
.option("database", "dataengineering") \
.option("collection", "employee") \
.load()Here in “uri”, we have passed the connection string. Which is the same as we passed to login from Mongo Compass. The second step is to print the schema and check the data in the data frame.
mongodf.printSchema()
mongodf.show(n=10)Transform data
First, we will create a HIVE view (or table) so we can write Spark SQL.
mongodf.createOrReplaceTempView("tempMongo")Also, check data by writing a select query
sqlContext.sql("SELECT * FROM tempMongo").show(n=5)We will filter employees who is having a salary of more than 50000 and we only need their first name and salary. So, our query will be as below.
newdf = sqlContext.sql("SELECT first_name,salary FROM tempMongo WHERE salary > 50000")Write data to MongoDB
Now, we will load the same (employees with more than 50K) data into MongoDB. For that, we will use the code below.
newdf.write.format("mongo") \
.option("uri", "mongodb://root:[email protected]:27017/") \
.option("database", "dataengineering") \
.option("collection", "employee1") \
.mode("append").save()This will check if the ‘employee1’ collection is there in MongoDB or not. If it is there, it will just load data. If not, it will create collection and load data.
Same way, we can use the same data frame and load data into MySQL or PostgreSQL or into any file format (CSV, Parquet, JSON, ORC, AVRO, etc.) Please check the earlier blog on how to load data into SQL or files if you don’t know.
Conclusion:
Here we learned
- How to create a connection with MongoDB from Spark
- How to read data from collection from Spark -> “spark.read”
- How to load data into the collection from Spark -> “dataframe.write”

