avatarRitam Mukherjee

Summary

The article outlines the process of building a real-time recommendation system using Apache Spark, Kafka, and a pre-trained ALS (Alternating Least Squares) model to enhance user engagement by predicting and suggesting user preferences.

Abstract

The article delves into the construction of a real-time recommendation engine that leverages the computational power of Apache Spark, the streaming capabilities of Kafka, and the predictive accuracy of a pre-trained ALS model. It details the steps from setting up a Spark session to structuring user events, loading a pre-trained ALS model, reading events from Kafka, training the ALS model with historical data, generating personalized recommendations, and finally, outputting these recommendations to a console or other systems. The author emphasizes the importance of real-time data processing in providing timely and relevant suggestions to users, akin to the recommendation systems used by giants like Netflix and Amazon.

Opinions

  • The author believes that real-time recommendations are crucial for user engagement and retention.
  • The use of a pre-trained ALS model is advocated for its effectiveness in predicting user preferences.
  • Structuring data with case classes in Spark is highlighted as a key step in handling incoming Kafka messages.
  • Continuous retraining of the ALS model with fresh data is suggested to maintain the relevance and accuracy of recommendations.
  • The author expresses enthusiasm and encouragement for readers to explore and build upon the foundational system they have outlined.
  • The article suggests that the described setup is scalable and can be refined for production-grade applications.
  • The author values community engagement and invites readers to follow their work on Medium and LinkedIn for further insights and articles on related topics.

Building Real-Time Recommendations with Spark, ALS, and Kafka

Ever wondered how online stores know exactly what you’d like to buy next

Non-members can access the full article through this Link.

Photo by Alessandro Bianchi on Unsplash

Whether it’s Netflix suggesting your next binge-worthy series or Amazon recommending the perfect gadget, real-time recommendations can make or break user engagement. It’s not magic — it’s machine learning paired with some real-time data wizardry. Today, we’re going to build a simple, real-time recommendation engine using Apache Spark, Kafka, and a pre-trained ALS model. So grab your coffee, and let’s get started! ☕

The Big Picture

Our goal is to build a real-time recommendation system using Spark’s powerful processing capabilities and a pre-trained ALS (Alternating Least Squares) model, which is a popular collaborative filtering ML algorithm, to predict user preference. Here’s what we will be doing —

  1. Listen to user activity — likes, clicks, rate or purchases — events streamed in from Kafka.
  2. Process the events to identify user preferences.
  3. Predict what users might like using a pre-trained recommendation model like ALS.
  4. Output those recommendations — either to your console,DB or back to Kafka for further processing.

Cool, right? So Lets Start !!

1. Setting the Stage with Spark

First things first, we need a Spark session, which acts as the gateway to all of Spark’s features:

val spark = SparkSession.builder()
  .appName("RealTimeRecommendation")
  .master("local[*]")
  .getOrCreate()

The local[*] tells Spark to use all available CPU cores, making our development setup fast and efficient.

2. Structuring User Events

Every system needs structured data. For our incoming Kafka messages, we define a UserEvent case class, a schema for our incoming events:

case class UserEvent(userId: String, eventType: String, productId: String, timestamp: Long)
case class Recommendation(userId: String, recommendedProducts: List[String])

Also we provide the Recommendation case class for the output schema.

3. Loading the Star of the Show — ALS Model

We assume you already have a pre-trained ALS model(if not,follow point 5). This model is loaded using :

val alsModel = ALSModel.load("path/to/saved_model")

This is the heart of our recommendation engine, providing predictions based on user-item interactions.

4. Reading Events from Kafka

Enter Kafka, the real-time data pipeline. User events (like a product view or purchase) are streamed into a Kafka topic named user-events. Spark reads these events, processes them, and maps them to our UserEvent case class:

val userEventsStreamDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .load()
  
val userEventsStream: Dataset[UserEvent] =  userEventsStreamDF.selectExpr("CAST(value AS STRING)")
  .as[String] // converts to Dataset[String]
  .map { line =>
    val fields = line.split(",") // Assuming the input format is CSV
    UserEvent(fields(0), fields(1), fields(2), fields(3).toLong) //[userId,eventType,productId,timestamp]
  } // userEventsStream -> to be used in point 6

5. Training the ALS Model

  • Training typically occurs offline (e.g., using Spark MLlib or a Python-based ML library like TensorFlow or Scikit-learn).
  • Use historical user-product interaction data to train the model.For our usecase we will use mock-up rating data.
  • Example in Spark MLlib:
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
.appName("ALS Recommendation")
.master("local[*]")
.getOrCreate()

// Example historical interaction data with rating: userId, productId, rating
val data = Seq(
  (1, 101, 5.0),
  (1, 102, 3.0),
  (2, 101, 4.0),
  (2, 103, 2.0)
)

import spark.implicits._
val interactionDF = data.toDF("userId", "productId", "rating")

val als = new ALS()
  .setUserCol("userId")
  .setItemCol("productId")
  .setRatingCol("rating")
  .setMaxIter(10)
  .setRank(10)
  .setRegParam(0.01)
  .setColdStartStrategy("drop") // Ensures NaN predictions are dropped

val model = als.fit(interactionDF)

// Save the model for use in Flink
model.save("path/to/saved_model")

6. Generating Recommendations

Now, the fun part — recommendations! For every user event, we:

  • Fetch the user ID.
  • Use the ALS model to generate the top 10 recommendations for that user.
  • Extract and return the product IDs in a list.
val recommendationsStream = userEventsStream.map { event =>
  val users = Seq(event.userId.toInt).toDF("userId")
  val userSubsetRecs = alsModel.recommendForUserSubset(users, 10)

  val recommendedProducts = userSubsetRecs
    .select(explode($"recommendations").as("rec")) 
    .select("rec.productId")
    .as[Int]
    .collect()
    .toList

  Recommendation(event.userId, recommendedProducts)
}

The explode function is a Spark gem here, helping us unpack the nested recommendations.

6. Writing the Recommendations

Finally, we stream the recommendations wherever we need them. Here, we’re printing to the console, but you could just as easily send them back to Kafka, save them to a database, or feed them into another downstream system.

val query = recommendationsStream.writeStream
  .outputMode("append")
  .format("console") // Or use "kafka" to send back to Kafka
  .start()

query.awaitTermination()

Fire up your terminal, and you’ll see something like:

+----------+--------------------+
| userId   | recommendedProducts|
+----------+--------------------+
| 12345    | [101, 202, 303...] |
+----------+--------------------+

Where to Go from Here?

You just built the foundation of a real-time recommendation engine. How cool is that? But the journey doesn’t stop here. You can:

  1. Send recommendations back to Kafka for other systems to consume.
  2. Exclude products the user has already interacted with (no one likes redundant suggestions!).
  3. Continuously retrain the ALS model with fresh data to keep it sharp.

Final Thoughts

What we’ve built here is just the tip of the iceberg. With a bit of tweaking, this setup could become the foundation of a production-grade recommendation engine. Whether you’re a data engineer, machine learning enthusiasst, or just a curious tinkerer, I hope this gave you the spark to dive deeper into real-time systems.

Go forth and recommend! 🎉

Thank you for taking the time to read my article! If you found this useful, your claps 👏 would motivate me to keep on writing such valuable content. You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.

💡Fun Fact : Did you know, the clap counter for each reader can go upto 30?

Further Reading

You may also like some of my below articles —

#ApacheSpark #Kafka #DataEngineering #ALS #Recommendation #MachineLearning

Spark
ALS
Kafka
Machine Learning
Data Engineering
Recommended from ReadMedium