Customer 360 in E-commerce : Real-Life Use Case with Delta Lake on Databricks
The superpowers of delta tables in action
Non-members can access the full article through this Link.
Data engineering isn’t just about pipelines and dashboards; it’s about solving real — world problems that impact businesses daily. One of the most versatile tools in the modern data engineer’s toolkit is Delta Lake, an open-source storage layer that brings ACID transactions, schema evolution, change data capture(CDC), versioning, deduplication, optimizations and Z-ordering to data lakes.
In this article, I’ll walk you through a real-world scenario where Delta Lake and Delta Tables in Databricks make the process easier. And yes, there will be code because what’s a data engineering story without some code magic?
The Scenario: Customer 360 in E-commerce
Imagine you’re building a Customer 360 platform for an e-commerce company. You need to combine data from customer updates, loyalty program, discount store into a single Delta table. The challenges include:
- Schema Evolution: New data fields keep appearing eg.
discount_code! - Deduplication: Multiple updates for the same customer ID.
- Upserts and CDC: Update existing records and add new ones.
- Versioning : Need to view and audit older versions of data.
- Optimized Queries: Analysts want lightning-fast queries on customer activity.
All the above are perfect use-case for Delta Lake. So, lets jump in!!
Pre-requisite : Mount ADLS Container to Databricks
We run the following in a Databricks notebook to mount our ADLS Gen2 storage container under some path ie. /mnt/delta :
val storageAccountName = "<your_storage_account_name>"
val containerName = "<your_container_name>"
val accessKey = "<your_access_key>"
// Mount the ADLS Gen2 container in Databricks
dbutils.fs.mount(
source = s"abfss://$containerName@$storageAccountName.dfs.core.windows.net/",
mountPoint = "/mnt/delta",
extraConfigs = Map(s"fs.azure.account.key.$storageAccountName.dfs.core.windows.net" -> accessKey)
)Step 1: Setting Up Delta Tables
Let’s start by creating mockup data for our Delta Table. Assuming you’ve already got a Databricks workspace:
Converting Raw Data to Delta Format
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Customer360DeltaLake")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
// Create mockup data. You can also read an existing raw file
val rawDF = Seq(
(1, "Jessica", "Standard", "2024-11-01"),
(2, "Tracey", "Premium", "2024-11-02"),
(3, "Steve", "Standard", "2024-11-05")
).toDF("customer_id", "name", "loyalty_tier", "last_updated")
// Write to Delta format
rawDF.write.format("delta")
.mode("overwrite")
.save("/mnt/delta/customer360_delta/")Congratulations! You’ve just created your first Delta table.
Step 2: Handling Schema Evolution
There is a sudden change in requirement — the marketing team now wants to track each customer’s discount_code. No problem—Delta Lake supports schema evolution like a pro.
import org.apache.spark.sql.functions.lit
import scala.util.Random
import io.delta.tables.DeltaTable
// First lets create new data to append
val newData = Seq(
(4, "Charles", "Gold", "2024-11-02"),
(5, "Antony", "Premium", "2024-11-06"),
(6, "Emily", "Standard", "2024-11-10")
).toDF("customer_id", "name", "loyalty_tier", "last_updated")
// Add the discount_code column based on loyalty_tier in the new data
val newDataWithDiscount = newData.withColumn(
"discount_code",
when(col("loyalty_tier") === "Standard", "10p")
.when(col("loyalty_tier") === "Gold", "15p")
.when(col("loyalty_tier") === "Premium", "20p")
.otherwise("Unknown") // Default case if loyalty_tier is unexpected
)
// Schema evolution in delta table, the old rows will have discount_code as null
newDataWithDiscount.write.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save("/mnt/delta/customer360_delta/")
// Update rows to set discount_code to "Unknown" if it is null
val deltaTable = DeltaTable.forPath(spark, "/mnt/delta/customer360_delta/")
// Perform update operation for older rows
deltaTable.update(
col("discount_code").isNull, // Identify rows with null discount_code
Map("discount_code" -> lit("Unknown")) // Set discount_code to "Unknown"
)Schema evolution ensures your pipelines won’t break when new fields are added.
Step 3: Upsert operation with Merge
Let’s say you receive an incoming dataset with updated and fresh customer records. Here’s how Delta Lake handles upsert operation using MERGE.
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions._
// Create new incoming data
val incomingData = Seq(
(4, "Charles", "Platinum", "2024-11-20","20p"), // Update loyalty_tier for customer_id 4
(7, "Diana", "Standard", "2024-11-21","10p"), // Insert new customer_id 7
(8, "Eve", "Gold", "2024-11-22","15p") // Insert new customer_id 8
).toDF("customer_id", "name", "loyalty_tier", "last_updated", "discount_code")
// Define the Delta table path
val deltaTablePath = "/mnt/delta/customer360_delta/"
// Convert the Delta table path to a DeltaTable object
val deltaTable = DeltaTable.forPath(spark, deltaTablePath)
// Perform the Merge (Upsert) operation
deltaTable.as("target")
.merge(
incomingData.as("source"),
"target.customer_id = source.customer_id" // Matching condition on customer_id
)
.whenMatched("target.last_updated < source.last_updated").updateAll() // Update all columns when there is a match
.whenNotMatched.insertAll() // Insert all columns when there is no match
.execute()Delta Lake ensures only the most recent data survives, and inserts new records.
“Enjoying the insights so far? If so, please tap the clap 👏 icon! It helps me know you’re finding value here. And if you’d like to stay updated with more content like this, follow me on Medium and LinkedIn”
Step 5: Optimizing and Z-ordering
To keep queries fast, we can optimize theDelta table with compaction and Z-ordering.
Understanding Z-Ordering: Z-ordering arranges data based on a Z-order curve, also known as a Morton curve. This curve maps multi-dimensional data into a single dimension while preserving the locality of the data points. In practical terms, it means that rows with similar values in the indexed columns are stored close together on disk. This is more useful for the below cases —
- Filter columns having high cardinality (ie. customer_id)
- When there are filters applied on multiple columns
Benefits in Z-Ordering:
- Efficient Data Skipping: By clustering related data, Delta Lake can skip over irrelevant data during query execution, reducing the amount of data read and speeding up queries.
- Improved Query Performance: It’s specially beneficial for queries that filter on multiple columns, as Z-ordering ensures that the relevant data is co-located, minimizing the need to scan large portions of the dataset.
Further reading on Z-Ordering:
Code sample :
// Compact small files
deltaTable.optimize().execute()
// Z-order by frequently filtered columns
deltaTable.optimize().zOrderBy("customer_id", "loyalty_tier").execute()Step 6: Querying Delta Table Versions
So, what do we do, if we need to debug or compare data over time? Delta Lake supports versioning.
This lets us travel back in time (previous versions) —
// Query a previous version of the table
// val deltaDF = DeltaTable.forPath(spark, "/mnt/delta/customer360_delta/")
// deltaDF.history().show() // display the version history of the table, including the version numbers, timestamps, and operation types
val version1Data = spark.read.format("delta")
.option("versionAsOf", 1)
.load("/mnt/delta/customer360_delta/")
version1Data.show()Version-1 will display the data after the append operation of newDataWithDiscount. You can also read Version-0 which is the state of delta table after the initial write.
Benefits of Delta Lake Versioning:
- Data Auditing: Track changes to your data over time, which is essential for compliance and auditing purposes.
- Reproducibility: Reproduce experiments or reports by accessing the exact state of the data at a specific point in time.
- Data Recovery: Recover from accidental data deletions or updates by reverting to a previous version.
This is incredibly useful for audits, debugging, or historical analysis.
Wrap-Up: Why Delta Lake Rocks
Delta Lake can be compared to Swiss Army knife for data engineers. In our Customer 360 use case, it helped us:
- Handle schema evolution without breaking pipelines.
- Deduplicate records with ease.
- Implement upserts and Change Data Capture (CDC) to keep data fresh.
- Optimize query performance with compaction and Z-ordering.
- Debug and audit with versioning.
Whether you’re working on streaming data, batch processing, or analytics, Delta Lake is a must-have tool for building robust, high-performannce data pipelines.
Do you have a use case where Delta Lake saved the day? Share it in the comments! Or, if you’re new to Delta, try the above example in Databricks and let me know how it works for you. 🚀
Thank you for taking the time to read my article! If you found this useful, your claps 👏 would motivate me to keep on writing such valuable content. You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.
Further Reading
You may also like some of my below articles —
- Real-Time Use-case : Fraud Detection in Financial Transactions with Kafka and Spark Streaming
- Building End-to-End Customer Insights Pipeline by Integrating Multiple Data Sources in Spark With Airflow Scheduler
- Data Engineering for ML: Building a Customer Churn Prediction Pipeline with Airflow
- Data Skew in Spark : Using Salting while avoiding common mistakes
- Parquet is Good for OLAP but Not for OLTP Use Cases. But Why?
- Understanding Database Isolation and Concurrency Management : Preventing Data Mix-Ups
#ApacheSpark #DataPipeline #DataEngineering #DeltaTable #Streaming #DataLake #DataBricks






