Apache Kafka with Spring Boot: A Beginner’s Guide

Introduction
Apache Kafka is a distributed streaming platform that was originally developed by LinkedIn and later open-sourced as an Apache Software Foundation project. It was created to address the challenges of handling large volumes of data in real-time and has since become a cornerstone of modern data architectures.
If you want to learn the fundamentals of Kafka, read here.
In this guide, we’ll delve into Spring Boot integration with Kafka, examining how it simplifies the use of Kafka’s native Java client APIs. Implementing Kafka with Spring Boot involves several steps, including setting up a Kafka cluster, creating producers and consumers, and integrating them into a Spring Boot application. Below is a detailed guide on how to do this:
Prerequisites
- Basic knowledge of Java and Spring Boot.
- Java Development Kit (JDK).
- Apache Kafka is installed and running on your local machine.
Installation and Setup
- Download and install Kafka, please refer to the official guide here.
- Create Spring Boot Application: The easiest way to get a skeleton for our app is to navigate to start.spring.io, fill in the basic details for our project, and select Kafka as a dependency. Then, download the zip file and use your favorite IDE to load the sources.
3. Add Dependencies
In the Spring Boot project, we need to add the necessary dependencies to the pom.xml
file to work with Kafka and Spring Kafka.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.11</version>
</dependency>
4. Configure Kafka Properties
We can define Kafka properties in the application.properties
or application.yml
file.
spring:
kafka:
bootstrap-servers: localhost:9092
auto-offset-reset: earliest
group-id: demo-group
Producing Messages
- Create a KafkaProducerConfig class to configure the Kafka producer properties.
package com.example.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
This class’s role is to establish the configuration for the Kafka producer. To send messages, the initial step is configuring a ProducerFactory, which defines the approach for creating Kafka Producer instances. Subsequently, a KafkaTemplate is required, encapsulating a Producer instance and offering convenient methods for dispatching messages to Kafka topics. Notably, Producer instances are designed to be thread-safe. Hence, employing a single instance across the application context will yield improved performance. Accordingly, it is advisable to use a single KafkaTemplate instance, as they are also designed to be thread-safe.
2. Publishing Messages
In this example, we have implemented a service to publish and consume transaction events on a Kafka topic.
@Service
@RequiredArgsConstructor
@Slf4j
public class TransactionProducerService {
@Value(value = "${spring.kafka.topic-name}")
private String topicName;
private final KafkaTemplate<String, String> kafkaTemplate;
@SneakyThrows
public void sendTransaction(final Transaction message) {
kafkaTemplate.send(topicName, new ObjectMapper().writeValueAsString(message));
log.info("Transaction published to Kafka");
}
}
The send API provides a CompletableFuture object as its result. To obtain information about the sent message and block the sending thread, we can use the get API of the CompletableFuture object. However, this approach can lead to decreased producer performance.
Kafka is known for its high-speed stream processing capabilities. Consequently, it is more advisable to manage results asynchronously, ensuring that subsequent messages are not held up by the outcome of the previous message.
Consuming Messages
- Create a KafkaConsumerConfig class to configure the Kafka consumer/listener properties.
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
For consuming messages, we need to configure a ConsumerFactory and a KafkaListenerContainerFactory. The DefaultKafkaConsumerFactory provides the default configuration for creating consumer instances. The DefaultKafkaConsumerFactory class implements the ConsumerFactory interface.
The ConsumerFactory interface is a Spring Kafka interface for configuring the underlying ConcurrentMessageListenerContainer.
The ConcurrentMessageListenerContainer is a collection of KafkaMessageListenerContainers. It’s important to note that KafkaMessageListenerContainer implements the MessageListener interface.
2. Consume Messages
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(id = "demo-kafka-group", topics = "test")
public void consume(final String transaction) {
log.info("Received transaction: {} ", transaction);
}
}
@KafkaListener allows a method to consume messages from Kafka topic(s). @KafkaListener designates a method as a listener in a KafkaMessageListenerContainer. A KafkaMessageListenerContainer is how Spring Boot connects and polls record from Kafka under the hood.
We can implement multiple listeners for a topic, each with a different group Id. Furthermore, one consumer can listen for messages from various topics:
@KafkaListener(topics = "topic1, topic2", groupId = "demo")
Spring also supports retrieval of one or more message headers using the @Header annotation in the listener.
Implement the Application To Publish Messages
Create REST endpoints in the Spring Boot application for handling banking transactions. Use the TransactionProducerService
to send transactions to Kafka.
@RestController
@RequestMapping("/transactions")
@RequiredArgsConstructor
public class TransactionController {
private final TransactionProducerService producerService;
@PostMapping("/publish")
public ResponseEntity<String> publishTransaction(@RequestBody Transaction transaction) {
try {
// Send the transaction to the Kafka topic
producerService.sendTransaction(transaction);
return ResponseEntity.ok("Transaction published to Kafka");
} catch (Exception e) {
return ResponseEntity.status(500).body("Error publishing transaction to Kafka: " + e.getMessage());
}
}
}
Create a new Java class, Transaction
to represent the message we will send over Kafka.
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@EqualsAndHashCode
@ToString
public class Transaction {
private String transactionId;
private String sender;
private String receiver;
private double amount;
private String description;
private long timestamp;
}
Start Kafka and Test the Application
Start the Kafka cluster and ensure that the test(or any other topic name configured in your application)
topic exists. We can use Kafka command-line tools or configure this in our Kafka server properties.
When we start the spring boot application, it will connect to Kafka and the consumer group(demo-kafka-group) will be subscribed to the topic “test” as shown below:

Once the application is started, post a transaction to the rest endpoint which will publish the message to the Kafka topic “test”. It will print the logs of the successful publishing of the message. Sample curl request to publish event:
curl --location 'http://localhost:8080/transactions/publish' \
--header 'Content-Type: application/json' \
--data '{
"transactionId": "a80a71f3-3d15-450d-a28e-4fc409e92f13",
"sender": "Reetesh kumar",
"receiver": "Demo Doe",
"amount": 1000.0,
"description": "Payment for services",
"timestamp": 1633969378000
}'

Once the message is published the consumer will consume the message and successful logs will be printed:
2023-10-11T22:04:44.434+01:00 INFO 56930 --- [fka-group-0-C-1] c.e.kafka.service.KafkaConsumerService : Received transaction: {"transactionId":"a80a71f3-3d15-450d-a28e-4fc409e92f13","sender":"Reetesh kumar","receiver":"Demo Doe","amount":1000.0,"description":"Payment for services","timestamp":1633969378000}
Conclusion
In this article, we explored the fundamental Spring Boot integration with Apache Kafka. We provided a brief overview of the classes involved in sending and receiving messages.
The complete source code for this article can be found on GitHub. Before executing the code, make sure to confirm that the Kafka server is operational and that the necessary topics have been manually created.
Happy Learning !!!