avatarRitam Mukherjee

Summary

The article discusses the implementation of a Customer 360 platform in e-commerce using Delta Lake on Databricks, highlighting the benefits of Delta Lake's features such as ACID transactions, schema evolution, and Z-ordering.

Abstract

The article delves into a real-world use case of leveraging Delta Lake and Delta Tables within Databricks to construct a Customer 360 platform for an e-commerce business. It outlines the challenges faced, including schema evolution, deduplication, upserts, CDC, versioning, and the need for optimized queries. The author provides a step-by-step guide on setting up Delta tables, handling schema changes, performing upsert operations with merge, and optimizing query performance through compaction and Z-ordering. The article also emphasizes the importance of versioning for data auditing, reproducibility, and recovery. By utilizing Delta Lake's capabilities, the solution ensures data integrity, facilitates efficient data management, and maintains high-performance data pipelines.

Opinions

  • The author believes that data engineering is not just about pipelines and dashboards but also about solving real-world business problems.
  • Delta Lake is portrayed as a versatile and essential tool in a data engineer's toolkit, offering robust features for managing data lakes effectively.
  • The article suggests that Delta Lake's schema evolution capability is particularly beneficial as it allows data pipelines to adapt to new data fields without breaking.
  • The author expresses that Delta Lake's support for ACID transactions, deduplication, and optimizations, such as Z-ordering, significantly enhance data query performance.
  • The author is enthusiastic about the ease of use and the power of Delta Lake, encouraging readers to try it out and share their experiences.
  • The article conveys the opinion that Delta Lake's versioning feature is crucial for auditing, debugging, and historical data analysis.
  • The author concludes by likening Delta Lake to a Swiss Army knife for data engineers, implying its indispensability in various data processing scenarios, including streaming, batch processing, and analytics.

Customer 360 in E-commerce : Real-Life Use Case with Delta Lake on Databricks

The superpowers of delta tables in action

Non-members can access the full article through this Link.

Photo by Levi Meir Clancy on Unsplash

Data engineering isn’t just about pipelines and dashboards; it’s about solving real — world problems that impact businesses daily. One of the most versatile tools in the modern data engineer’s toolkit is Delta Lake, an open-source storage layer that brings ACID transactions, schema evolution, change data capture(CDC), versioning, deduplication, optimizations and Z-ordering to data lakes.

In this article, I’ll walk you through a real-world scenario where Delta Lake and Delta Tables in Databricks make the process easier. And yes, there will be code because what’s a data engineering story without some code magic?

The Scenario: Customer 360 in E-commerce

Imagine you’re building a Customer 360 platform for an e-commerce company. You need to combine data from customer updates, loyalty program, discount store into a single Delta table. The challenges include:

  1. Schema Evolution: New data fields keep appearing eg. discount_code !
  2. Deduplication: Multiple updates for the same customer ID.
  3. Upserts and CDC: Update existing records and add new ones.
  4. Versioning : Need to view and audit older versions of data.
  5. Optimized Queries: Analysts want lightning-fast queries on customer activity.

All the above are perfect use-case for Delta Lake. So, lets jump in!!

Pre-requisite : Mount ADLS Container to Databricks

We run the following in a Databricks notebook to mount our ADLS Gen2 storage container under some path ie. /mnt/delta :

val storageAccountName = "<your_storage_account_name>"
val containerName = "<your_container_name>"
val accessKey = "<your_access_key>" 

// Mount the ADLS Gen2 container in Databricks
dbutils.fs.mount(
  source = s"abfss://$containerName@$storageAccountName.dfs.core.windows.net/",
  mountPoint = "/mnt/delta",
  extraConfigs = Map(s"fs.azure.account.key.$storageAccountName.dfs.core.windows.net" -> accessKey)
)

Step 1: Setting Up Delta Tables

Let’s start by creating mockup data for our Delta Table. Assuming you’ve already got a Databricks workspace:

Converting Raw Data to Delta Format

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("Customer360DeltaLake")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

// Create mockup data. You can also read an existing raw file
 val rawDF = Seq(
  (1, "Jessica", "Standard", "2024-11-01"),
  (2, "Tracey", "Premium", "2024-11-02"),
  (3, "Steve", "Standard", "2024-11-05")
 ).toDF("customer_id", "name", "loyalty_tier", "last_updated")

// Write to Delta format
rawDF.write.format("delta")
  .mode("overwrite")
  .save("/mnt/delta/customer360_delta/")

Congratulations! You’ve just created your first Delta table.

Step 2: Handling Schema Evolution

There is a sudden change in requirement — the marketing team now wants to track each customer’s discount_code. No problem—Delta Lake supports schema evolution like a pro.

import org.apache.spark.sql.functions.lit
import scala.util.Random
import io.delta.tables.DeltaTable


// First lets create new data to append 
val newData = Seq(
  (4, "Charles", "Gold", "2024-11-02"),
  (5, "Antony", "Premium", "2024-11-06"),
  (6, "Emily", "Standard", "2024-11-10")
).toDF("customer_id", "name", "loyalty_tier", "last_updated")

// Add the discount_code column based on loyalty_tier in the new data
val newDataWithDiscount = newData.withColumn(
  "discount_code",
  when(col("loyalty_tier") === "Standard", "10p")
    .when(col("loyalty_tier") === "Gold", "15p")
    .when(col("loyalty_tier") === "Premium", "20p")
    .otherwise("Unknown") // Default case if loyalty_tier is unexpected
)

// Schema evolution in delta table, the old rows will have discount_code as null
newDataWithDiscount.write.format("delta")
  .mode("append") 
  .option("mergeSchema", "true")
  .save("/mnt/delta/customer360_delta/")

// Update rows to set discount_code to "Unknown" if it is null
val deltaTable = DeltaTable.forPath(spark, "/mnt/delta/customer360_delta/")

// Perform update operation for older rows
deltaTable.update(
  col("discount_code").isNull, // Identify rows with null discount_code
  Map("discount_code" -> lit("Unknown")) // Set discount_code to "Unknown"
)

Schema evolution ensures your pipelines won’t break when new fields are added.

Step 3: Upsert operation with Merge

Let’s say you receive an incoming dataset with updated and fresh customer records. Here’s how Delta Lake handles upsert operation using MERGE.

import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions._

// Create new incoming data
val incomingData = Seq(
  (4, "Charles", "Platinum", "2024-11-20","20p"),  // Update loyalty_tier for customer_id 4
  (7, "Diana", "Standard", "2024-11-21","10p"),    // Insert new customer_id 7
  (8, "Eve", "Gold", "2024-11-22","15p")           // Insert new customer_id 8
).toDF("customer_id", "name", "loyalty_tier", "last_updated", "discount_code")

// Define the Delta table path
val deltaTablePath = "/mnt/delta/customer360_delta/"

// Convert the Delta table path to a DeltaTable object
val deltaTable = DeltaTable.forPath(spark, deltaTablePath)

// Perform the Merge (Upsert) operation
deltaTable.as("target")
  .merge(
    incomingData.as("source"),
    "target.customer_id = source.customer_id" // Matching condition on customer_id
  )
  .whenMatched("target.last_updated < source.last_updated").updateAll() // Update all columns when there is a match
  .whenNotMatched.insertAll() // Insert all columns when there is no match
  .execute()

Delta Lake ensures only the most recent data survives, and inserts new records.

“Enjoying the insights so far? If so, please tap the clap 👏 icon! It helps me know you’re finding value here. And if you’d like to stay updated with more content like this, follow me on Medium and LinkedIn

Step 5: Optimizing and Z-ordering

To keep queries fast, we can optimize theDelta table with compaction and Z-ordering.

Understanding Z-Ordering: Z-ordering arranges data based on a Z-order curve, also known as a Morton curve. This curve maps multi-dimensional data into a single dimension while preserving the locality of the data points. In practical terms, it means that rows with similar values in the indexed columns are stored close together on disk. This is more useful for the below cases —

  • Filter columns having high cardinality (ie. customer_id)
  • When there are filters applied on multiple columns

Benefits in Z-Ordering:

  • Efficient Data Skipping: By clustering related data, Delta Lake can skip over irrelevant data during query execution, reducing the amount of data read and speeding up queries.
  • Improved Query Performance: It’s specially beneficial for queries that filter on multiple columns, as Z-ordering ensures that the relevant data is co-located, minimizing the need to scan large portions of the dataset.

Further reading on Z-Ordering:

Code sample :

// Compact small files
deltaTable.optimize().execute()

// Z-order by frequently filtered columns
deltaTable.optimize().zOrderBy("customer_id", "loyalty_tier").execute()

Step 6: Querying Delta Table Versions

So, what do we do, if we need to debug or compare data over time? Delta Lake supports versioning.

This lets us travel back in time (previous versions) —

// Query a previous version of the table

// val deltaDF = DeltaTable.forPath(spark, "/mnt/delta/customer360_delta/")
// deltaDF.history().show()  // display the version history of the table, including the version numbers, timestamps, and operation types

val version1Data = spark.read.format("delta")
  .option("versionAsOf", 1)
  .load("/mnt/delta/customer360_delta/")

version1Data.show()

Version-1 will display the data after the append operation of newDataWithDiscount. You can also read Version-0 which is the state of delta table after the initial write.

Benefits of Delta Lake Versioning:

  • Data Auditing: Track changes to your data over time, which is essential for compliance and auditing purposes.
  • Reproducibility: Reproduce experiments or reports by accessing the exact state of the data at a specific point in time.
  • Data Recovery: Recover from accidental data deletions or updates by reverting to a previous version.

This is incredibly useful for audits, debugging, or historical analysis.

Wrap-Up: Why Delta Lake Rocks

Delta Lake can be compared to Swiss Army knife for data engineers. In our Customer 360 use case, it helped us:

  1. Handle schema evolution without breaking pipelines.
  2. Deduplicate records with ease.
  3. Implement upserts and Change Data Capture (CDC) to keep data fresh.
  4. Optimize query performance with compaction and Z-ordering.
  5. Debug and audit with versioning.

Whether you’re working on streaming data, batch processing, or analytics, Delta Lake is a must-have tool for building robust, high-performannce data pipelines.

Do you have a use case where Delta Lake saved the day? Share it in the comments! Or, if you’re new to Delta, try the above example in Databricks and let me know how it works for you. 🚀

Thank you for taking the time to read my article! If you found this useful, your claps 👏 would motivate me to keep on writing such valuable content. You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.

Further Reading

You may also like some of my below articles —

#ApacheSpark #DataPipeline #DataEngineering #DeltaTable #Streaming #DataLake #DataBricks

Databricks
Delta Lake
Data Lake
Spark
Data Engineering
Recommended from ReadMedium