This article discusses a solution for polling batches of Kafka messages using confluent-kafka-python with ThreadPoolExecutor.
Abstract
The article presents a challenge faced by the author in consuming messages from Apache Kafka using Python and confluent-kafka-python library. The goal was to speed up the consumption of Kafka messages by polling them in batches. The author proposes a solution using asynio.gather() and ThreadPoolExecutor to poll a batch of messages from Apache Kafka. The solution involves creating a Kafka consumer, a thread pool, and using asyncio.gather() to collect the return of the batch_poll() method. The article concludes that this solution is useful when microservices are running asynchronously and can speed up blocking I/O processes by running each of them in a separate thread concurrently.
Opinions
The author believes that polling messages in batches can speed up the consumption of Kafka messages and ultimately increase the speed of microservices.
The author suggests using confluent-kafka-python library for consuming messages from Apache Kafka as it provides Avro deserializer and client to Schema Registry.
The author recommends using asyncio.gather() and ThreadPoolExecutor to poll a batch of messages from Apache Kafka.
The author mentions that this solution is useful when microservices are running asynchronously.
The author suggests that ThreadPoolExecutor can speed up blocking I/O processes by running each of them in a separate thread concurrently.
The author recommends checking out the docs for more information on asyncio and ThreadPoolExecutor.
The author suggests checking out the multi-threaded version of Kafka consumer for further reading.
Poll batches of Kafka messages with confluent-kafka-python with ThreadPoolExecutor
You should have some knowledge of writing Python code and how Apache Kafka consumer polling works.
generated by Microsoft Image Creator
The Challenge
I was tasked to create Event Driven microservices by consuming messages that are being produced and down-streamed into Apache Kafka from our MySQL databases with Debezium.
I am going to use Python with confluent-kafka-python for this as all the messages are stored in Avro format and the library provides Avro deserializer and client to Schema Registry to ease the process of deserializing and serializing without the need to maintain the schema manually.
To speed up the consumption of Kafka messages and ultimately increases the speed of the microservices, ideally the messages should be polled in batches. Unfortunately, I am not able to retrieve messages in batch with confluent-kafka-python consumer.
The Solution
We are going to use asynio.gather() and ThreadPoolExecutor to poll a batch of messages from Apache Kafka.
First, create a Kafka consumer. You can get the full example code from confluent-kafka-python Github.
consumer= DeserializingConsumer(consumer_conf)
Create a thread pool that will help us to pool Kafka messages concurrently.
with concurrent.futures.ThreadPoolExecutor as executor:
future = executor.submit(consumer.poll)
...
This line calls batch_poll() method for 10 times asynchronously, effectively polls 10 messages from Apache Kafka and asyncio.gather() will collect the return of the method and append them in an array. In this case it will return an array of Message objects.
This solution make use of async/await and threads to help us to retrieve a batch of messages from Apache Kafka. It is useful when our microservices is running asynchronously. ThreadPoolExecutor can speed up blocking I/O processes by running each of them in a separate thread concurrently.