avatarKalpan Shah

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

5702

Abstract

(<span class="hljs-string">"mongo"</span>)
.option(<span class="hljs-string">"uri"</span>, <span class="hljs-string">"mongodb://root:[email protected]:27017/"</span>)
.option(<span class="hljs-string">"database"</span>, <span class="hljs-string">"dataengineering"</span>)
.option(<span class="hljs-string">"collection"</span>, <span class="hljs-string">"employee"</span>)
.<span class="hljs-built_in">load</span>()</pre></div><p id="a057">This will read data from the “employee” collection and have it in “mongodf”. In the next step, we will check the schema of the data frame and also check the data in the data frame.</p><div id="7b26"><pre>mongodf<span class="hljs-selector-class">.printSchema</span>() mongodf<span class="hljs-selector-class">.show</span>(n=<span class="hljs-number">2</span>)</pre></div><figure id="16fb"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*K_jC0K_J6swXktHN"><figcaption></figcaption></figure><h1 id="f91d">Create a Hive table from a data frame</h1><p id="e8c8">When we create HIVE tables/views, it will create a “spark-warehouse” directory and store all the data in this directory. By default, the location of this directory is where we start the spark session. If you want to change the location of the spark warehouse directory, we need to pass that into configuration when we start the spark session.</p><p id="87a5">Example for changing the location of the warehouse directory, while starting the spark session</p><div id="eb28"><pre>spark = SparkSession
.builder
.appName(<span class="hljs-string">"chapter5"</span>)
.<span class="hljs-built_in">config</span>(<span class="hljs-string">"spark.sql.warehouse.dir"</span>, warehouse_location)
.enableHiveSupport()
.getOrCreate()</pre></div><p id="8593">Here, we are using the default location. So, for us, it will create a directory in the same folder.</p><div id="14a3"><pre>mongodf.<span class="hljs-built_in">write</span>.saveAsTable(<span class="hljs-string">"hivesampletable"</span>)</pre></div><p id="0aaa">This is creating a warehouse directory and loading data into that.</p><figure id="447f"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*a_PMESXR_mZ6kT_R"><figcaption></figcaption></figure><figure id="3346"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*MF8XRWNyX-kYjIQQ"><figcaption></figcaption></figure><p id="4da5">It creates parquet files and stores data in parquet files. You can also specify the file format in which you want to store data.</p><p id="cddb">Currently, it supports the file formats below.</p><ul><li>sequencefile</li><li>rcfile</li><li>orc</li><li>parquet</li><li>textfile</li><li>Avro</li></ul><h1 id="5b8b">Global and Temp HIVE Views</h1><p id="cec4">The difference between temporary and global temporary views being subtle, it can be a source of mild confusion among developers new to Spark. A temporary view is tied to a single Spark Session within a Spark application. In contrast, a global temporary view is visible across multiple Spark Sessions within a Spark application. Yes, you can create multiple Spark Sessions within a single Spark application — this can be handy, for example, in cases where you want to access (and combine) data from two different Spark Sessions that don’t share the same Hive meta store configurations.</p><p id="9543">First, we will create a temp HIVE view from the data frame. (Which is having data from MongoDB Collection)</p><div id="59fa"><pre>mongodf.createOrReplaceTempView(<span class="hljs-string">"sampletempview"</span>)</pre></div><p id="4c9f">Now, we will create a Global HIVE view using the code below.</p><div id="ac19"><pre>mongodf.createOrReplaceGlobalTempView(<span class="hljs-string">"sampleglobalview"</span>)</pre></div><figure id="a40b"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*MysP6vui30Kj6iDl"><figcaption></figcaption></figure><h1 id="ca86">List database and tables in the database</h1><p id="3cac">When we create a Global or Temp view, if we don’t specify or tell to use a specific database. By default, it will use the “default” database. We will first check what all databases are available in our spark session.</p><p id="1636"><b>List Databases</b></p><div id="178c"><pre>spark<span class="hljs-selector-class">.sql</span>("show databases")<span class="hljs-selector-class">.show</span>()</pre></div><figure id="551c"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*ehzfTdE5Zwu8-sdf"><figcaption></figcaption></figure><p id="3817">or can also use the pyspark command</p><div id="92c8"><pre>spark<span class="hljs-selector-class">.catalog</span><span class="hljs-selector-class">.listDatabases</span>()</pre></div><figure id="9105"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*xN4nANUT6GVUnOgl"><figcaption></figcaption></figure><p id="eac8">Now, we will check what all tables and views are there in this database.</p><p id="23be"><b>List tables & Views in Database</b></p><div id="755b"><pre>spark<span class="hljs-selector-class">.sql</span>("show tables")<span class="hljs-selector-class">.show</span>()</pre></div><figure id="82d1"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*koKUczX9jcHjRE4Y"><figcaption></figcaption></figure><p id="36ab">pyspark command for the same.</p><div id="3ceb"><pre>spark<span class="hljs-selector-class">.catalog</span><span class="hljs-selector-class">.listTables</span>()</pre></div><figure id="e995"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*Rj5jV_RP3xJ3TBVL"><figcaption></figcaption></figure><p id="8034"><b>List details of Table or view</b></p><div i

Options

d="ca21"><pre>spark.catalog.listColumns(<span class="hljs-string">"hivesampletable"</span>)</pre></div><figure id="4f7e"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*E-G5GTsM2tIXcSQa"><figcaption></figcaption></figure><p id="6165"><b>Read data from the HIVE table</b></p><div id="8402"><pre>sqlContext<span class="hljs-selector-class">.sql</span>("SELECT * FROM hivesampletable")<span class="hljs-selector-class">.show</span>()</pre></div><figure id="9bda"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*WEA6Bkudbf4QxZ5r"><figcaption></figcaption></figure><p id="2418">or can also use below</p><div id="c903"><pre>spark.<span class="hljs-built_in">table</span>(<span class="hljs-string">"sampletempview"</span>).show()</pre></div><figure id="78f9"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*PAplYqwDElvArcVb"><figcaption></figcaption></figure><p id="4829"><b>Read data from the Global HIVE view</b></p><div id="6c5d"><pre>sqlContext<span class="hljs-selector-class">.sql</span>("SELECT * FROM global_temp.sampleglobalview")<span class="hljs-selector-class">.show</span>()</pre></div><figure id="e437"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*Qst8_jcfrZmhpLKe"><figcaption></figcaption></figure><h1 id="3520">Drop all the created tables and views in the default database</h1><div id="9bbd"><pre>spark.catalog.dropGlobalTempView(<span class="hljs-string">"sampleglobalview"</span>) spark.catalog.dropTempView(<span class="hljs-string">"sampletempview"</span>)</pre></div><figure id="ffc6"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*A9J1iRrw36GCsOIO"><figcaption></figcaption></figure><h1 id="a344">Create dataeng database and create global and temp view using SQL</h1><div id="6803"><pre>spark.sql(<span class="hljs-string">"CREATE DATABASE dataeng"</span>) spark.sql(<span class="hljs-string">"USE dataeng"</span>)</pre></div><p id="723b">Here, we are creating a new database named “dataeng” and asking spark to use that database for all the next operations.</p><figure id="08df"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*xboezeY7Hou4SpxT"><figcaption></figcaption></figure><p id="e56a">Once we do this, it will create more directories named “dataeng” in the warehouse directory</p><figure id="01c4"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*sMW5GFnirnRhSsNr"><figcaption></figcaption></figure><p id="1603">Now, if we do a list of databases, it will list as below</p><div id="d9f3"><pre>spark<span class="hljs-selector-class">.sql</span>("show databases")<span class="hljs-selector-class">.show</span>()</pre></div><figure id="73c7"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*enajVYzbEQMWcyoz"><figcaption></figcaption></figure><p id="dac3">We will create the HIVE table, HIVE temp, and Global view again in this database.</p><div id="ed56"><pre>mongodf.<span class="hljs-built_in">write</span>.saveAsTable(<span class="hljs-string">"hivesampletable"</span>) mongodf.createOrReplaceGlobalTempView(<span class="hljs-string">"sampleglobalview"</span>) mongodf.createOrReplaceTempView(<span class="hljs-string">"sampletempview"</span>) spark.sql(<span class="hljs-string">"show tables"</span>).show()</pre></div><figure id="9a7e"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*jWbwBOXdZk15IW69"><figcaption></figcaption></figure><h1 id="812c">Access the global table from another session</h1><p id="4253">We have a current spark session is created in the “spark” variable. We will create one more session and store that in another variable named “newspark”. and then we will try to read data from a global view/table.</p><div id="70ad"><pre><span class="hljs-attr">newSpark</span> = spark.newSession()</pre></div><p id="a90a">if want to check all properties of the session. We can use the pass session name.</p><figure id="a4ee"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*3gEUKSHwzqdO-VIy"><figcaption></figcaption></figure><p id="ece2">Now, with the “newspark” session we will try to read data from a global view.</p><div id="af07"><pre>newSpark<span class="hljs-selector-class">.sql</span>("SELECT * FROM global_temp.sampleglobalview")<span class="hljs-selector-class">.show</span>()</pre></div><figure id="4cee"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*u7ojaU7Om7Y28RmT"><figcaption></figcaption></figure><p id="a777">If we try to read the local (temp) table or view, it will give an error with a new session. As it is not finding a temp table or view with this session.</p><figure id="5628"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/0*6ZeN3o0d7wxyiKBN"><figcaption></figcaption></figure><h1 id="a4cd">Conclusion:</h1><p id="4e02">Here, we learned</p><ul><li>How to create HIVE Database</li><li>How to create a HIVE table</li><li>How to create HIVE temp and global view</li><li>How to do SQL transformation in HIVE views by writing SQL Queries</li><li>How to drop tables and views</li></ul><h1 id="4b57">Video Explanation:</h1> <figure id="39d5"> <div> <div> <img class="ratio" src="http://placehold.it/16x9"> <iframe class="" src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2Fe1z0oOZMGcg&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3De1z0oOZMGcg&amp;key=a19fcc184b9711e1b4764040d3dc5c07&amp;type=text%2Fhtml&amp;schema=google" allowfullscreen="" frameborder="0" height="480" width="854"> </div> </div> </figure></iframe></div></div></figure></article></body>

Spark ETL Chapter 5 with Hive (HIVE tables | Temp View | Global view)

Previous blog/Context:

In an earlier blog, we discussed Spark ETL with Cloud data lakes (AWS S3 bucket). Please find below blog post for more details.

Introduction:

In this blog, we will discuss HIVE tables/views and we will do ETL with Hive tables. We will learn about how to create global and temporary hive tables and deal with them. We will create all these hive tables on our Spark instance only. (We can also create hive tables on external locations)

Today, we will perform below Spark ETL operations

  • Read data from one of the sources (We take the source as our MongoDB collection)
  • Create a data frame from the source
  • Create a Hive table from the data frame
  • Create a temp Hive view from the data frame
  • Create a global Hive view from the data frame
  • List database and tables in the database
  • Drop all the created tables and views in the default database
  • Create a ‘dataeng’ database and create global and temp views using SQL
  • Access the global table from another session

First, clone below GitHub repo, where we have all the required sample files and solution

If you don’t have a setup for Spark instance follow the earlier blog for setting up Data Engineering tools in your system. (Data Engineering suite will setup Spark, MySQL, PostgreSQL, and MongoDB in your system) In that Spark instance, we already have packages installed for Azure blog storage and Azure Data Lake Services.

Read Data from the Source (MongoDB) & create a data frame from the source

For this ETL, we will be using MongoDB as our source, we will read data from MongoDB and load data into HIVE tables. We will need all the packages for MongoDB and HIVE. With our spark instance, we already have HIVE packages installed but we don’t have MongoDB packages installed, so with the spark session, we will pass the configuration for downloading/installing MongoDB packages.

Hive packages

Starting Spark session with HIVE and MongoDB Packages

# First Load all the required library and also Start Spark Session
# Load all the required library
from pyspark.sql import SparkSession
#Start Spark Session
spark = SparkSession.builder.appName("chapter5")\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
        .getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

We have spark session is started with MongoDB and HIVE packages. Now we will read data from the MongoDB collection.

In the last session (ETL with NoSQL), we loaded sample data into MongoDB, we read the same collection, and loaded it into the Spark data frame.

mongodf = spark.read.format("mongo") \
    .option("uri", "mongodb://root:[email protected]:27017/") \
    .option("database", "dataengineering") \
    .option("collection", "employee") \
    .load()

This will read data from the “employee” collection and have it in “mongodf”. In the next step, we will check the schema of the data frame and also check the data in the data frame.

mongodf.printSchema()
mongodf.show(n=2)

Create a Hive table from a data frame

When we create HIVE tables/views, it will create a “spark-warehouse” directory and store all the data in this directory. By default, the location of this directory is where we start the spark session. If you want to change the location of the spark warehouse directory, we need to pass that into configuration when we start the spark session.

Example for changing the location of the warehouse directory, while starting the spark session

spark = SparkSession \
    .builder \
    .appName("chapter5") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

Here, we are using the default location. So, for us, it will create a directory in the same folder.

mongodf.write.saveAsTable("hivesampletable")

This is creating a warehouse directory and loading data into that.

It creates parquet files and stores data in parquet files. You can also specify the file format in which you want to store data.

Currently, it supports the file formats below.

  • sequencefile
  • rcfile
  • orc
  • parquet
  • textfile
  • Avro

Global and Temp HIVE Views

The difference between temporary and global temporary views being subtle, it can be a source of mild confusion among developers new to Spark. A temporary view is tied to a single Spark Session within a Spark application. In contrast, a global temporary view is visible across multiple Spark Sessions within a Spark application. Yes, you can create multiple Spark Sessions within a single Spark application — this can be handy, for example, in cases where you want to access (and combine) data from two different Spark Sessions that don’t share the same Hive meta store configurations.

First, we will create a temp HIVE view from the data frame. (Which is having data from MongoDB Collection)

mongodf.createOrReplaceTempView("sampletempview")

Now, we will create a Global HIVE view using the code below.

mongodf.createOrReplaceGlobalTempView("sampleglobalview")

List database and tables in the database

When we create a Global or Temp view, if we don’t specify or tell to use a specific database. By default, it will use the “default” database. We will first check what all databases are available in our spark session.

List Databases

spark.sql("show databases").show()

or can also use the pyspark command

spark.catalog.listDatabases()

Now, we will check what all tables and views are there in this database.

List tables & Views in Database

spark.sql("show tables").show()

pyspark command for the same.

spark.catalog.listTables()

List details of Table or view

spark.catalog.listColumns("hivesampletable")

Read data from the HIVE table

sqlContext.sql("SELECT * FROM hivesampletable").show()

or can also use below

spark.table("sampletempview").show()

Read data from the Global HIVE view

sqlContext.sql("SELECT * FROM global_temp.sampleglobalview").show()

Drop all the created tables and views in the default database

spark.catalog.dropGlobalTempView("sampleglobalview")
spark.catalog.dropTempView("sampletempview")

Create dataeng database and create global and temp view using SQL

spark.sql("CREATE DATABASE dataeng")
spark.sql("USE dataeng")

Here, we are creating a new database named “dataeng” and asking spark to use that database for all the next operations.

Once we do this, it will create more directories named “dataeng” in the warehouse directory

Now, if we do a list of databases, it will list as below

spark.sql("show databases").show()

We will create the HIVE table, HIVE temp, and Global view again in this database.

mongodf.write.saveAsTable("hivesampletable")
mongodf.createOrReplaceGlobalTempView("sampleglobalview")
mongodf.createOrReplaceTempView("sampletempview")
spark.sql("show tables").show()

Access the global table from another session

We have a current spark session is created in the “spark” variable. We will create one more session and store that in another variable named “newspark”. and then we will try to read data from a global view/table.

newSpark = spark.newSession()

if want to check all properties of the session. We can use the pass session name.

Now, with the “newspark” session we will try to read data from a global view.

newSpark.sql("SELECT * FROM global_temp.sampleglobalview").show()

If we try to read the local (temp) table or view, it will give an error with a new session. As it is not finding a temp table or view with this session.

Conclusion:

Here, we learned

  • How to create HIVE Database
  • How to create a HIVE table
  • How to create HIVE temp and global view
  • How to do SQL transformation in HIVE views by writing SQL Queries
  • How to drop tables and views

Video Explanation:

Spark
Pyspark
Hive
Etl
Databricks
Recommended from ReadMedium