avatarIvan Franchin

Summary

This context provides a step-by-step guide on integrating Cloudevents with Spring Cloud Stream in Kafka-based News Producer and Consumer applications.

Abstract

The article "Using

Spring Boot | Spring Cloud Stream | Cloudevents

Using Cloudevents in a Kafka Producer and Consumer that uses Spring Cloud Stream

Step-by-step guide on how to configure Cloudevents in News Producer and Consumer apps

Photo by engin akyurt on Unsplash

In this article, we will explain how to configure Cloudevents in two Spring Boot Kafka applications: News Producer and News Consumer.

Cloudevents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems.

You can find the complete code and implementation in the article linked below. Feel free to follow the steps explained in the article and get started.

This is one article of a series of articles where we are using the News Producer and News Consumer as base applications. Throughout these articles, we’ll cover topics such as implementing unit tests, implementing end-to-end tests and deployment to Kubernetes.

So, let’s get started.

Updating News Producer

Update the pom.xml

In the pom.xml file, let’s include the Cloudevents dependencies by adding the following content (highlighted in bold):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
    ...
    <properties>
        ...
        <cloudevents.version>3.0.0</cloudevents.version>
    </properties>
    <dependencies>
        ...
        <dependency>
            <groupId>io.cloudevents</groupId>
            <artifactId>cloudevents-core</artifactId>
            <version>${cloudevents.version}</version>
        </dependency>
        ...
    </dependencies>
    ...
</project>

Update the NewsPublisher class

Let’s update entirely the NewsPublisher class with the new content below:

package com.example.newsproducer.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
public class NewsPublisher {

    private static final Logger log = LoggerFactory.getLogger(NewsPublisher.class);

    private final StreamBridge streamBridge;

    public NewsPublisher(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    public void send(News news) {
        Message<News> newsMessage = CloudEventMessageBuilder.withData(news)
                .setSource("news-app/news-producer")
                .build();
        streamBridge.send("news-out-0", newsMessage);
        log.info("{} sent!", news);
    }
}

It will envelop the news object in a Cloudevents message. Besides, when building the newsMessage, we are setting the source and keeping the default values for the other properties.

Updating News Consumer

Update the NewsListener class

Let’s update entirely the NewsListener class with the new content below:

package com.example.newsconsumer.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;

import java.util.function.Consumer;

@Configuration
public class NewsListener {

    private static final Logger log = LoggerFactory.getLogger(NewsListener.class);

    @Bean
    public Consumer<Message<News>> news() {
        return news -> log.info("Received News! \"{}\" created on '{}'. Headers: {}",
                news.getPayload().title(),
                news.getPayload().createdOn(),
                news.getHeaders());
    }
}

Different from the previous code, now we have enhanced the logging text so that it also includes the message headers.

Cloudevents in Action

Start Docker Compose services

In a terminal and inside news-app folder, run the following command to start the Docker Compose services:

docker compose up -d

Start News Producer and Consumer apps

In terminal and inside the news-producer root folder, run the command below:

./mvnw clean spring-boot:run

In another terminal and inside the news-consumer root folder, run the following command:

./mvnw clean spring-boot:run

Sending News

In another terminal, run the following cURL command to send one news:

curl -i -X POST localhost:8080/api/news \
  -H 'Content-Type: application/json' \
  -d '{"title":"Article about Spring Cloud Stream and Kafka"}'

In News Producer application logs, we should see:

INFO 37021 --- [nio-8080-exec-1] c.e.n.publisher.NewsPublisher            : News[title=Article about Spring Cloud Stream and Kafka, createdOn=2023-11-04T08:51:13.886572Z] sent!

The News Consumer should listen to the message and log the following (the log was formatted to improve readability):

Received News! "Article about Spring Cloud Stream and Kafka" created on '2023-11-04T08:51:13.886572Z'.
  Headers: {
    ce-specversion=1.0,
    deliveryAttempt=1,
    ce-id=dfe6d032-9403-43bb-a545-37536ba70fbb,
    kafka_timestampType=CREATE_TIME,
    kafka_receivedTopic=com.example.news-producer.news,
    ce-source=http://news-app/news-producer,
    kafka_offset=2,
    scst_nativeHeadersPresent=true,
    ce-type=com.example.newsproducer.publisher.News,
    message-type=cloudevent,
    kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7afa8785,
    source-type=kafka,
    id=50a244ed-4ce2-256e-2b14-f98d5731bd47,
    kafka_receivedPartitionId=1,
    contentType=application/json,
    kafka_receivedTimestamp=1699087873966,
    kafka_groupId=newsConsumerGroup,
    timestamp=1699087874112
  }

The Cloudevents introduces new headers, which are identifiable by the ce- prefix (highlighted in bold). It’s worth noting that the message-type is designated as cloudevent (also highlighted in bold).

Shutdown

In the terminals where the News Producer and Consumer are running, press Ctrl+C to stop the applications.

To stop the Docker Compose services, in a terminal, make sure you are inside news-app root folder and run the following command:

docker compose down -v

Conclusion

In this article, we’ve explained how to use Cloudevents in two Spring Boot Kafka applications, News Producer and News Consumer. Cloudevents is a helpful standard for describing event data consistently, making it simpler for various services and systems to collaborate. Finally, we put the setup to the test by sending one news through the News Producer API and verifying in the News Consumer whether the message included the Cloudevents headers.

Additional Readings

Support and Engagement

If you enjoyed this article and would like to show your support, please consider taking the following actions:

  • 👏 Engage by clapping, highlighting, and replying to my story. I’ll be happy to answer any of your questions;
  • 🌐 Share my story on Social Media;
  • 🔔 Follow me on: Medium | LinkedIn | Twitter | GitHub;
  • ✉️ Subscribe to my newsletter, so you don’t miss out on my latest posts.
Cloudevents
Spring Boot
Technology
Software Development
Kafka
Recommended from ReadMedium