Building End-to-End Customer Insights Pipeline by Integrating Multiple Data Sources in Spark With Airflow Scheduler
My articles are open to everyone; non-members can read the full article by clicking this link .
In today’s fast-paced world, businesses thrive by making regular, data-driven decisions. Imagine you’re working for a retail company that wants to gain up-to-date insights on customer behaviour to personalize recommendations, optimize marketing, and manage inventory more effectively. But the data is spread across multiple systems — a SQL database, a cloud storage solution, and a daily CSV file with sales records.
In this guide, I’ll walk you through building a data pipeline that uses Apache Spark to process all this data and Apache Airflow to keep things running smoothly on a schedule. By the end, we’ll have an automated pipeline that gives daily customer insights. Let’s dive in!
The Problem: Regular Customer Insights from Multiple Data Sources
Our goal is to create a pipeline that:
- Ingests data from multiple sources: A SQL database for customer profiles, an S3 bucket for web interactions, and a daily CSV file of sales records.
- Processes and integrates data in Spark to generate a single dataset for analysis.
- Schedules the entire pipeline with Airflow so it runs automatically each day and supports daily reporting.
This setup will allow our business analysts and machine learning teams to access updated insights every day, helping them make faster, smarter decisions.
Step 1: Ingesting Data into Spark
In our use case, we need to pull in data from three sources —
- Customer profiles stored in a PostgreSQL database.
- Web interactions ie. clicks, page views events are stored as JSON files in an S3 bucket.
- Daily sales records provided as a CSV file in a shared directory.
Connecting to PostgreSQL
Let’s start by connecting to the SQL database to get customer profile data. In Scala, we can use Spark’s JDBC connector to connect and load data from PostgreSQL:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("CustomerInsightsPipeline")
.getOrCreate()
// Load customer profiles from PostgreSQL
val customerProfilesDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://db-server:5432/retail_db")
.option("dbtable", "public.customers")
.option("user", "<your_username>")
.option("password", "<your_password>")
.load()Customer Profile Schema :
+-------------------+-----------+-----------------------------------+
| Column | Data Type | Description |
+-------------------+-----------+-----------------------------------+
| customer_id | INT | Unique ID for each customer |
| first_name | STRING | Customer’s first name |
| last_name | STRING | Customer’s last name |
| email | STRING | Customer’s email address |
| date_of_birth | DATE | Customer’s birth date |
| registration_date | DATE | Date the customer registered |
| gender | STRING | Customer’s gender |
| country | STRING | Customer’s country |
| loyalty_points | INT | Loyalty points earned by customer |
+-------------------+-----------+-----------------------------------+
Reading Web Interactions Json Logs from S3
Next, we pull web interactions logs from Amazon S3. These logs are stored in JSON format, capturing user clicks, page views, and other behaviour.
val webInteractionsDF = spark.read
.option("inferSchema", "true")
.json("s3a://bucket-name/path/to/logs/")This JSON data gives us valuable information on customer activity on the website, like browsing behaviour and products viewed.
Web Interactions Schema :
+------------------+-----------+-----------------------------------------------------------+
| Column | Data Type | Description |
+------------------+-----------+-----------------------------------------------------------+
| interaction_id | STRING | Unique ID for the interaction |
| customer_id | INT | ID of the customer (foreign key to customers) |
| interaction_type | STRING | Type of interaction (e.g., page_view, click, add_to_cart) |
| interaction_time | TIMESTAMP | Timestamp of the interaction |
| page_url | STRING | URL of the page interacted with |
| product_id | STRING | Product ID if interaction was product-related (optional) |
| session_id | STRING | ID of the customer’s session |
| device_type | STRING | Device used (e.g., mobile, desktop) |
| browser | STRING | Browser used (e.g., Chrome, Safari) |
| referral_source | STRING | Source of referral (e.g., google, email) |
+------------------+-----------+-----------------------------------------------------------+Reading Daily Sales CSV
Finally, we bring in the daily sales CSV. This file contains transactional records with details on what each customer purchased.
val salesDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/path/to/daily_sales.csv")Daily Sales Schema :
+-----------------+----------------+-------------------------------------------------------------------+
| Column | Data Type | Description |
+-----------------+----------------+-------------------------------------------------------------------+
| transaction_id | STRING | Unique ID for the transaction |
| customer_id | INT | ID of the customer making the purchase (foreign key to customers) |
| product_id | STRING | ID of the purchased product |
| product_name | STRING | Name of the product |
| quantity | INT | Quantity of the product sold |
| purchase_amount | DECIMAL(10, 2) | Total amount for the transaction |
| purchase_date | TIMESTAMP | Date and time of the transaction |
| payment_method | STRING | Method of payment (e.g., card, cash) |
| store_location | STRING | Store or location where sale was made |
+-----------------+----------------+-------------------------------------------------------------------+Now we have all the raw data we need in three DataFrames. The next step is processing it.

If you’re finding this article useful, your claps and share would mean a lot — it inspires me to keep creating valuable content like this!
You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.
Step 2: Processing and Integrating Data with Spark
With the data loaded, it’s time to clean and transform it into something more useful. We’ll perform a series of operations to create a unified dataset that provides insights into customer behaviour and transactions.
Joining Data
First, let’s join these DataFrames on customer_id to create a complete view of each customer’s profile, interactions, and purchases.
val joinedDF = customerProfilesDF
.join(webInteractionsDF, customerProfilesDF("customer_id") === webInteractionsDF("customer_id"), "left")
.join(salesDF, customerProfilesDF("customer_id") === salesDF("customer_id"), "left")Aggregating Web Interactions
To get a sense of how engaged each customer is, let’s calculate the total number of interactions per customer. This will be helpful for targeting loyal or highly engaged customers.
import org.apache.spark.sql.functions._
val interactionsPerCustomerDF = joinedDF
.groupBy("customer_id")
.agg(count("interaction_type").alias("total_interactions"))Calculating Average Purchase Value
We can also calculate the average purchase value per customer to understand spending habits. This gives us insights into high-value customers.
val averagePurchaseDF = joinedDF
.groupBy("customer_id")
.agg(avg("purchase_amount").alias("avg_purchase"))With these transformations, we’ve created two key output datasets: interactionsPerCustomerDF and averagePurchaseDF. Now we’re ready to move on to automating this pipeline!
Step 3: Creating & Packaging the Spark Job
We will be using a scala sbt project to package the file.
Under the project folder open the build.sbt file and add the following configurations:
name := "CustomerInsightsPipeline"
version := "0.1"
scalaVersion := "2.13.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.2.0",
"org.apache.spark" %% "spark-sql" % "3.2.0",
"org.apache.hadoop" % "hadoop-aws" % "3.2.0",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.11.901",
"org.postgresql" % "postgresql" % "42.3.1"
)To make it easy to schedule, we’ll put all our Spark code into a Scala script — CustomerInsightsJob.scala.
You can configure Spark to use your specific aws profile from your ~/.aws/credentials file for accessing your s3 data.
Here’s what the final scala file looks like:
package com.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object CustomerInsightsJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("CustomerInsightsPipeline")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
.config("spark.hadoop.fs.s3a.profile", "<your-profile-name>")
.getOrCreate()
// Load data from PostgreSQL, S3, and CSV
val customerProfilesDF = spark.read.format("jdbc")
.option("url", "jdbc:postgresql://db-server:5432/retail_db")
.option("dbtable", "public.customers")
.option("user", "<your_username>")
.option("password", "<your_password>")
.load()
val webInteractionsDF = spark.read.option("inferSchema", "true").json("s3a://bucket-name/path/to/logs/")
val salesDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/path/to/daily_sales.csv")
// Data processing steps
val joinedDF = customerProfilesDF
.join(webInteractionsDF, customerProfilesDF("customer_id") === webInteractionsDF("customer_id"), "left")
.join(salesDF, customerProfilesDF("customer_id") === salesDF("customer_id"), "left")
val interactionsPerCustomerDF = joinedDF
.groupBy("customer_id")
.agg(count("interaction_type").alias("total_interactions"))
val averagePurchaseDF = joinedDF
.groupBy("customer_id")
.agg(avg("purchase_amount").alias("avg_purchase"))
// Save final results
interactionsPerCustomerDF.write.mode("overwrite").parquet("/output/interactions")
averagePurchaseDF.write.mode("overwrite").parquet("/output/avg_purchases")
spark.stop()
}
}Project Structure:
project-root/
├── dags/
│ └── customer_insights_pipeline.py
├── src/
│ └── main/
│ └── scala/
│ └── CustomerInsightsJob.scala
├── project/
│ └── assembly.sbt
├── build.sbt
├── target/
│ └── scala-2.13/
│ └── CustomerInsightsPipeline-assembly-0.1.jar
├── conf/
│ └── application.conf
├── logs/
│ └── spark/
│ └── application.log
└── README.mdCompile and package the application: In your project directory, run —
sbt clean assemblyLocate the JAR file:
After the build completes, you’ll find the JAR file in the target/scala-2.13/ directory, named something like CustomerInsightsPipeline-assembly-0.1.jar.
Step 3: Scheduling the Pipeline with Airflow
Now that we have our Spark jobs in place, we want to automate everything so it runs daily without manual intervention. This is where Apache Airflow steps in.
Setting Up Airflow to Run the Spark Job
In Airflow, a workflow is defined as a Directed Acyclic Graph (DAG). Here’s how to set up a DAG to run this Spark job every day:
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 11, 1),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
with DAG(
'customer_insights_pipeline',
default_args=default_args,
schedule_interval='@daily', #Runs everyday at midnight UTC
catchup=False,
) as dag:
# Start node
start = DummyOperator(task_id='start')
# Spark job task
run_spark_job = SparkSubmitOperator(
task_id='run_spark_job',
application='/path/to/CustomerInsightsPipeline-assembly-0.1.jar',
conn_id='spark_default',
java_class='com.example.CustomerInsightsJob',
verbose=True,
)
# End node
end = DummyOperator(task_id='end')
# Define task dependencies
start >> run_spark_job >> endIn this DAG:
applicationspecifies the path to our Spark job script.schedule_interval='@daily'means this job will run every day.
Trigger the DAG
- In the Airflow UI, make sure the DAG
customer_insights_pipelineis On. - The DAG will be triggered as per the scheudle.
- Monitor the task in the Graph View or Tree View
Additional Points :
a. Airflow Monitoring
Airflow provides a user-friendly interface to monitor DAG runs, task statuses, and logs:
- DAG Runs: View the history of DAG executions, including success and failure statuses.
- Task Instances: Drill down into individual task runs to inspect logs and troubleshoot issues.
b. Spark Logging
Ensure your Spark job is configured to log relevant information:
- Log Levels: Set appropriate log levels (e.g., INFO, WARN, ERROR) to capture necessary details without overwhelming the logs.
- Log Aggregation: Configure Spark to aggregate logs from all executors, making it easier to diagnose issues.
5. Visualization
Visualizing the processed data can provide valuable insights:
- Data Storage: Store the output data in a format compatible with visualization tools (e.g., Parquet, ORC).
- Visualization Tools: Use tools like Tableau, Power BI, or Jupyter Notebooks to create dashboards and reports based on the processed data.
Wrapping It All Up
And there you have it — a full end-to-end customer insights pipeline that:
- Pulls in data from multiple sources
- Processes and integrates it in Spark
- Runs on a daily schedule with Airflow
With this setup, your retail company now has daily updated insights into customer behaviour, making it easy to personalize recommendations, adjust marketing strategies, and manage inventory more effectively. The best part? It’s fully automated and ready to scale.
Thanks for following along, and happy data engineering!
I hope you enjoyed this article. If so, your claps and share would mean a lot — it inspires me to keep creating valuable content like this!
You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.
#SQL #DataPipeline #DataEngineering #ApacheSpark #Airflow #Technology






