Real-Time Use-case : Fraud Detection in Financial Transactions with Kafka and Spark Streaming
Leveraging Kafka and Spark Streaming for Real-Time Fraud Detection
Non-members can access the full article through this Link.
Introduction
Let’s face it — fraud is a big problem for financial institutions. With a staggering number of transactions happening every second, spotting fraudulent activity before it impacts customers and the business is crucial. That’s where real-time analytics comes into picture, and we’re diving into how-to-build a streamlined fraud detection system using Apache Kafka and Spark Streaming.
Imagine monitoring transaction data in real time, identifying suspicious activity, and flagging potentially fraudulent transactions instantly. In this article, we’ll use Kafka to stream transaction data and Spark Streaming in Scala to detect fraud patterns as they occur.
Excited ?? Let’s jump right in!
Why Kafka and Spark?
Kafka is an amazing tool for ingesting real-time data and handling millions of messages per second. Spark Streaming, on the other hand, is excellent for fast, scalable data — processing. Together, they’re a perfect duo for building real-time fraud detection systems that require low latency, high throughput, and quick insights.
The Setup
Let’s start with a high-level picture of what we’ll build:
- Kafka will act as the data source, continuously streaming financial transactions.
- Spark Streaming will process the data, applying rules to detect suspicious activities.
- Alerting System (like an external API or database) where flagged transactions will be recorded.
Use Case: Detecting Suspicious Transactions
Our use case is straightforward: monitor transactions for patterns that suggest fraud. For example:
- Large withdrawals
- Rapid transactions from the same account within specified duration
- High-risk international transactions
Now, let’s get into the code!!
Step 1: Set Up Kafka
Assuming Kafka is running locally, we’ll start by creating a topic named transactions. This is where we’ll publish our financial transaction data.
kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1Step 2: Simulate Transaction Data
We’ll need a data producer to generate mock transaction data. In a real — world scenario, this data would come from your payment processing system; but here we can use a simple Scala producer to push data into Kafka:
import org.apache.kafka.clients.producer._
import java.util.Properties
import scala.util.Random
//Properties for KafkaProducer
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all"); // Most durable setting, producer waits for full commit of the KafkaRecord from the broker
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//Code for transaction stream simulation
while (true) {
val transaction = s"""{
"account_id": "${Random.nextInt(1000)}",
"amount": ${Random.nextDouble() * 20000},
"location": "${if (Random.nextBoolean()) "domestic" else "international"}",
"timestamp": ${System.currentTimeMillis()}
}"""
//Sending ProducerRecords to the prodcuer for topic - transactions
producer.send(new ProducerRecord[String, String]("transactions", null, transaction))
Thread.sleep(100)
}Step 3: Spark Streaming for Fraud Detection
Let’s get into Spark Streaming. The code below reads from the transactions Kafka topic, deserializes the data, and applies simple fraud detection rules(explain later in this article).
Step 3.1: Set Up Spark Session
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val spark = SparkSession.builder
.appName("FraudDetection")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")Step 3.2: Define the Schema
We’ll define a schema for our transaction data, which will make it easier to parse JSON into DataFrame rows.
val transactionSchema = new StructType()
.add("account_id", StringType)
.add("amount", DoubleType)
.add("location", StringType)
.add("timestamp", LongType)Step 3.3: Stream Data from Kafka
Here, we connect to Kafka and read transaction data, parsing it as JSON.
val kafkaDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.load()
val transactions = kafkaDf
.selectExpr("CAST(value AS STRING) as json")
.select(from_json(col("json"), transactionSchema).as("data"))
.select("data.*")“Enjoying the insights so far? If so, please tap the clap 👏 icon! It helps me know you’re finding value here. And if you’d like to stay updated with more content like this, follow me on Medium and LinkedIn”
Step 3.4: Apply Fraud Detection Rules
Now, we’ll add simple fraud detection rules. In this case, let’s flag transactions that:
- Exceed $10,000.
- Occur too rapidly (within 1 minute of another transaction from the same account).
- Are international and exceed $5,000.
val flaggedTransactions = transactions
.withColumn("is_large_withdrawal", col("amount") > 10000)
.withColumn("is_high_risk_location", col("location") === "international" && col("amount") > 5000)
// Rule for rapid transactions from the same account within 1 minute
val rapidTransactions = transactions
.groupBy(window(col("timestamp"), "1 minute"), col("account_id"))
.count()
.filter("count > 1")
.select( col("account_id"),
col("window.start").as("window_start"),
col("window.end").as("window_end"),
col("count")
)
// Get final suspicious transactions
val suspiciousTransactions = flaggedTransactions.alias("flagged")
.join( rapidTransactions.alias("rapid"),
col("flagged.account_id") === col("rapid.account_id") &&
col("flagged.timestamp").between(col("rapid.window_start"), col("rapid.window_end")),
"left_outer" // left_outer join ensures other transactions from flaggedTransactions are retained
)
.filter(col("is_large_withdrawal") || col("is_high_risk_location") || col("count").isNotNull) // count present means its flagged in rapidTransactions
.select(
col("flagged.account_id"),
col("flagged.amount"),
col("flagged.location"),
col("flagged.timestamp")
)Step 3.5: Write Output
You could output suspicious transactions to the console, a database, or trigger an alert.
a. Viewing the write stream in console —
val query = suspiciousTransactions.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()b.Sending to DataBase for future analysis —
val query = suspiciousTransactions.writeStream
.foreachBatch { (batchDF, batchId) =>
batchDF.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/fraud_db")
.option("dbtable", "suspicious_transactions")
.option("user", "username")
.option("password", "password")
.mode("append")
.save()
}
.start()
query.awaitTermination()c. Writing the rows to an Alerting System —
val query = suspiciousTransactions.writeStream
.foreachBatch { (batchDF, batchId) =>
batchDF.collect().foreach { row =>
AlertingSystem.sendAlert(row) // Replace this with your alerting logic
}
}
.start()
query.awaitTermination()Step 4: Testing the Pipeline
To test the pipeline:
- Start the Kafka producer script to publish transactions.
- Run the Spark Streaming job to consume data and flag suspicious transactions.
Watch the console output to see if transactions are flagged. If you see outputs like:
+----------+-------+-----------+-------------+
|account_id|amount |location |timestamp |
+----------+-------+-----------+-------------+
|102 |11000.0|domestic |1679926400000|
|540 |7000.0 |international|1679926460000|
+----------+-------+-----------+-------------+These transactions meet one or more fraud criteria.
Scaling It Up
We need to keep in mind that these example works well on a small scale, but here are some tips for scaling up in a production environment:
- Kafka Partitioning: Increase partitions on the
transactionstopic to support parallelism. - Increase Topic Replication Factor to make the system more reliable.
- Checkpoints and State Management: Use Spark’s checkpointing to maintain state across batches.
- Alerting: Integrate with notification services for immediate fraud alerts.
- Model-Based Detection: Consider applying machine learninng models in Spark to detect fraud more accurately.
Wrapping Up!
With Kafka and Spark Streaming, building a real-time fraud detection pipeline is straightforward, scalable, and incredibly effective. This example scratches the surface — you can extend it with more complex rules, machine learning models, or integrations with alerting systems.
Whether you’re a fintech startup or a large financial institution, real-time analytics can transform how you respond to fraud.
So, give it a try, and happy coding!
Thank you for taking the time to read my article! If you found this useful, your claps 👏 would motivate me to keep on writing such valuable content. You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.
Further Reading
You may also like some of my below articles —
- Building End-to-End Customer Insights Pipeline by Integrating Multiple Data Sources in Spark With Airflow Scheduler
- Data Engineering for ML: Building a Customer Churn Prediction Pipeline with Airflow
- Data Skew in Spark : Using Salting while avoiding common mistakes
- Parquet is Good for OLAP but Not for OLTP Use Cases. But Why?
- Understanding Database Isolation and Concurrency Management : Preventing Data Mix-Ups
#ApacheSpark #DataPipeline #DataEngineering #Kafka #Streaming #FraudDetection






