Building Real-Time Recommendations with Spark, ALS, and Kafka
Ever wondered how online stores know exactly what you’d like to buy next
Non-members can access the full article through this Link.
Whether it’s Netflix suggesting your next binge-worthy series or Amazon recommending the perfect gadget, real-time recommendations can make or break user engagement. It’s not magic — it’s machine learning paired with some real-time data wizardry. Today, we’re going to build a simple, real-time recommendation engine using Apache Spark, Kafka, and a pre-trained ALS model. So grab your coffee, and let’s get started! ☕
The Big Picture
Our goal is to build a real-time recommendation system using Spark’s powerful processing capabilities and a pre-trained ALS (Alternating Least Squares) model, which is a popular collaborative filtering ML algorithm, to predict user preference. Here’s what we will be doing —
- Listen to user activity — likes, clicks, rate or purchases — events streamed in from Kafka.
- Process the events to identify user preferences.
- Predict what users might like using a pre-trained recommendation model like ALS.
- Output those recommendations — either to your console,DB or back to Kafka for further processing.
Cool, right? So Lets Start !!
1. Setting the Stage with Spark
First things first, we need a Spark session, which acts as the gateway to all of Spark’s features:
val spark = SparkSession.builder()
.appName("RealTimeRecommendation")
.master("local[*]")
.getOrCreate()The local[*] tells Spark to use all available CPU cores, making our development setup fast and efficient.
2. Structuring User Events
Every system needs structured data. For our incoming Kafka messages, we define a UserEvent case class, a schema for our incoming events:
case class UserEvent(userId: String, eventType: String, productId: String, timestamp: Long)
case class Recommendation(userId: String, recommendedProducts: List[String])Also we provide the Recommendation case class for the output schema.
3. Loading the Star of the Show — ALS Model
We assume you already have a pre-trained ALS model(if not,follow point 5). This model is loaded using :
val alsModel = ALSModel.load("path/to/saved_model")This is the heart of our recommendation engine, providing predictions based on user-item interactions.
4. Reading Events from Kafka
Enter Kafka, the real-time data pipeline. User events (like a product view or purchase) are streamed into a Kafka topic named user-events. Spark reads these events, processes them, and maps them to our UserEvent case class:
val userEventsStreamDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.load()
val userEventsStream: Dataset[UserEvent] = userEventsStreamDF.selectExpr("CAST(value AS STRING)")
.as[String] // converts to Dataset[String]
.map { line =>
val fields = line.split(",") // Assuming the input format is CSV
UserEvent(fields(0), fields(1), fields(2), fields(3).toLong) //[userId,eventType,productId,timestamp]
} // userEventsStream -> to be used in point 65. Training the ALS Model
- Training typically occurs offline (e.g., using Spark MLlib or a Python-based ML library like TensorFlow or Scikit-learn).
- Use historical user-product interaction data to train the model.For our usecase we will use mock-up rating data.
- Example in Spark MLlib:
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("ALS Recommendation")
.master("local[*]")
.getOrCreate()
// Example historical interaction data with rating: userId, productId, rating
val data = Seq(
(1, 101, 5.0),
(1, 102, 3.0),
(2, 101, 4.0),
(2, 103, 2.0)
)
import spark.implicits._
val interactionDF = data.toDF("userId", "productId", "rating")
val als = new ALS()
.setUserCol("userId")
.setItemCol("productId")
.setRatingCol("rating")
.setMaxIter(10)
.setRank(10)
.setRegParam(0.01)
.setColdStartStrategy("drop") // Ensures NaN predictions are dropped
val model = als.fit(interactionDF)
// Save the model for use in Flink
model.save("path/to/saved_model")6. Generating Recommendations
Now, the fun part — recommendations! For every user event, we:
- Fetch the user ID.
- Use the ALS model to generate the top 10 recommendations for that user.
- Extract and return the product IDs in a list.
val recommendationsStream = userEventsStream.map { event =>
val users = Seq(event.userId.toInt).toDF("userId")
val userSubsetRecs = alsModel.recommendForUserSubset(users, 10)
val recommendedProducts = userSubsetRecs
.select(explode($"recommendations").as("rec"))
.select("rec.productId")
.as[Int]
.collect()
.toList
Recommendation(event.userId, recommendedProducts)
}The explode function is a Spark gem here, helping us unpack the nested recommendations.
6. Writing the Recommendations
Finally, we stream the recommendations wherever we need them. Here, we’re printing to the console, but you could just as easily send them back to Kafka, save them to a database, or feed them into another downstream system.
val query = recommendationsStream.writeStream
.outputMode("append")
.format("console") // Or use "kafka" to send back to Kafka
.start()
query.awaitTermination()Fire up your terminal, and you’ll see something like:
+----------+--------------------+
| userId | recommendedProducts|
+----------+--------------------+
| 12345 | [101, 202, 303...] |
+----------+--------------------+Where to Go from Here?
You just built the foundation of a real-time recommendation engine. How cool is that? But the journey doesn’t stop here. You can:
- Send recommendations back to Kafka for other systems to consume.
- Exclude products the user has already interacted with (no one likes redundant suggestions!).
- Continuously retrain the ALS model with fresh data to keep it sharp.
Final Thoughts
What we’ve built here is just the tip of the iceberg. With a bit of tweaking, this setup could become the foundation of a production-grade recommendation engine. Whether you’re a data engineer, machine learning enthusiasst, or just a curious tinkerer, I hope this gave you the spark to dive deeper into real-time systems.
Go forth and recommend! 🎉
Thank you for taking the time to read my article! If you found this useful, your claps 👏 would motivate me to keep on writing such valuable content. You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.
💡Fun Fact : Did you know, the clap counter for each reader can go upto 30?
Further Reading
You may also like some of my below articles —
- Real-Time Use-case : Fraud Detection in Financial Transactions with Kafka and Spark Streaming
- Customer 360 in E-commerce : Real-Life Use Case with Delta Lake on Databricks
- Building End-to-End Customer Insights Pipeline by Integrating Multiple Data Sources in Spark With Airflow Scheduler
- Data Engineering for ML: Building a Customer Churn Prediction Pipeline with Airflow
- Data Skew in Spark : Using Salting while avoiding common mistakes
- Parquet is Good for OLAP but Not for OLTP Use Cases. But Why?
- Understanding Database Isolation and Concurrency Management : Preventing Data Mix-Ups
#ApacheSpark #Kafka #DataEngineering #ALS #Recommendation #MachineLearning






