avatarKirshi Yin

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

4421

Abstract

0<span class="hljs-tag"></<span class="hljs-name">version</span>></span> <span class="hljs-tag"></<span class="hljs-name">dependency</span>></span> <span class="hljs-tag"><<span class="hljs-name">dependency</span>></span> <span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>com.fasterxml.jackson.core<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span> <span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>jackson-databind<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span> <span class="hljs-tag"><<span class="hljs-name">version</span>></span>2.12.4<span class="hljs-tag"></<span class="hljs-name">version</span>></span> <span class="hljs-tag"></<span class="hljs-name">dependency</span>></span></pre></div><p id="3f7d">2. Create the <code>Weather.java</code> class, as shown below:</p> <figure id="7a85"> <div> <div>

            <iframe class="gist-iframe" src="/gist/kirshiyin89/54c2de208cb755fa397b6d77a93b94d6.js" allowfullscreen="" frameborder="0" height="undefined" width="undefined">
          </div>
        </div>
    </figure></iframe></div></div></figure><ul><li>We only need the date and the temperature from the CSV record.</li><li>We use a regex pattern, <code>“\\s”</code>, to extract the day from the date.</li></ul><p id="f97a">3. Create a new Java file called <code>KafkaStreamsDemo.java</code>, and copy the following code:</p>
    <figure id="74ff">
        <div>
          <div>
            
            <iframe class="gist-iframe" src="/gist/kirshiyin89/8d29a68dfe6e0bb5a29619d9ef06f40e.js" allowfullscreen="" frameborder="0" height="undefined" width="undefined">
          </div>
        </div>
    </figure></iframe></div></div></figure><p id="cb8c">A step-by-step code explanation is shown below:</p><ul><li>The <code>createAvgTempCalcStream()</code> method defines the stream that computes the average day temperature based on the CSV records.</li><li>First, we take the <code>inputStream</code> to map the file records to our custom <code>Weather</code> object. The Kafka <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-">map</a> operation transforms the input record to a new record in the output stream. It also gives the ability to change the key/value types.</li><li>The <code>mapLineToWeather()</code> method constructs a new <code>Weather</code> object. We split each record line by a comma. We use the first and the fourth value to extract the date and the temperature.</li><li>The <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey--"><code>groupBy</code>Key</a> operator groups the records in the stream by an existing key. This operation is required before we can perform an aggregation. It results in a <code>KGroupStream</code>. Note that <code>null </code>records are not included in the result.</li><li>Since the default Serdes do not match the key/value types, we must explicitly specify new Serdes. In this case - <code>(with(Serdes.String(), Serdes.Double())</code>. The date is a String, and the average temperature is a Double.</li><li>The <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Merger-org.apache.kafka.streams.kstream.Materialized-"><code>aggreg</code>ate</a> operation allows the output value to have a different type than the input value. It aggregates the values by the grouped key.</li><li>The <code>TempCalculator</code> class helps us calculate the average daily temperature. We count each record for the same day and sum up the temperature. Then we divide the <code>sum</code> by the <code>count</code> to receive the average.</li><li>The stream is materialized into a <code>KTable</code>. We need a custom Serdes for our <code>TempCalculator</code> object. That’s why we have to create a <code>JsonSerializer</code> and <code>JsonDeserializer</code> to construct the <code>CustomSerdes</code> class.</li><li>Finally, we use the <a href="https://kafka.apache.org/11/java

Options

doc/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-"><code>mapVal</code>ues</a> operation to transform the records into a new key-value pair. It retains the key of the original record. The day is a String, and the average temperature is a Double. We send the results to the output Kafka topic we created earlier.</li><li>Since we want to have the app running and listening for input topics, we use the <code>CountDownLatch</code> object. It allows more threads to wait until a set of operations being performed in other threads completes.</li></ul><h2 id="de88">Run the app</h2><ol><li>Run the Kafka program from your IDE.</li><li>From your terminal, send the CSV file records to the input topic.</li></ol><div id="e67f"><pre><span class="hljs-built_in">cat</span> /home/{user}/kafka-playground/weatherHistory.csv | bin/kafka-console-producer.sh — broker-list localhost:9092 — topic weather-tmp-input</pre></div><p id="3298">3. In another window, call the consumer, as shown below:</p><div id="ad2f"><pre>./bin/kafka-console-consumer<span class="hljs-selector-class">.sh</span> <span class="hljs-attr">--bootstrap-server</span> localhost:<span class="hljs-number">9092</span>
<span class="hljs-attr">--topic</span> weather-tmp-output
<span class="hljs-attr">--from-beginning</span>
<span class="hljs-attr">--formatter</span> kafka<span class="hljs-selector-class">.tools</span><span class="hljs-selector-class">.DefaultMessageFormatter</span>
<span class="hljs-attr">--property</span> print.key=true
<span class="hljs-attr">--property</span> key.deserializer=org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.kafka</span><span class="hljs-selector-class">.common</span><span class="hljs-selector-class">.serialization</span><span class="hljs-selector-class">.StringDeserializer</span>
<span class="hljs-attr">--property</span> value.deserializer=org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.kafka</span><span class="hljs-selector-class">.common</span><span class="hljs-selector-class">.serialization</span>.DoubleDeserializer</pre></div><p id="2f95">This will trigger our Java code. You should see an output like this below. Note that I’ve truncated the result for the sake of brevity.</p><div id="b377"><pre><span class="hljs-code">...... 2016–09–30 17.090740740740742 2016–09–04 22.15300925925926 2016–09–05 16.838657407407407 2016–09–06 17.289583333333336 2016–09–07 21.448379629629628 2016–09–08 22.427314814814817 2016–09–09 22.702546296296294 ......</span></pre></div><p id="64c9">Great! The Kafka program processed the input topic and calculated the average daily temperature as expected.</p><h1 id="143a">Conclusion</h1><p id="1bb2">In this tutorial, you learned how to use Kafka Streams operations such as <code>aggregate</code>, <code>groupByKey</code>. Also, you know how to create a custom Serdes to transform data for output topics.</p><p id="f8d6">I hope you’ve learned useful information from this article.</p><p id="cf0e">If you’re interested in more Java/Message processing topics, you might like my related article:</p><div id="0e82" class="link-block"> <a href="https://betterprogramming.pub/java-process-messages-from-rabbitmq-and-upload-data-to-minio-cloud-b70ecd2e82be"> <div> <div> <h2>Java: Process Messages From RabbitMQ and Upload Data to MinIO Cloud</h2> <div><h3>Consume messages from RabbitMQ and upload epub files to MinIO Cloud Storage</h3></div> <div><p>betterprogramming.pub</p></div> </div> <div> <div style="background-image: url(https://miro.readmedium.com/v2/resize:fit:320/0*0jSTjVOmXsTzyswF)"></div> </div> </div> </a> </div><p id="9763">Thank you for reading, and happy coding!</p><h1 id="e70c">References</h1><ul><li><a href="https://github.com/kirshiyin89/kafka-streams-demo">GitHub Repo</a></li><li><a href="https://kafka.apache.org/quickstart">https://kafka.apache.org/quickstart</a></li><li><a href="https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#aggregating">https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#aggregating</a></li></ul></article></body>

Kafka Streams: How To Process a CSV File To Perform Calculations

Process a big CSV file and calculate the average daily temperature from historical weather records

Photo by Mediamodifier on Unsplash

Kafka Streams is a popular library for building streaming applications. It offers a robust solution to applications and microservices that must process data in real time very fast.

In this tutorial, you’ll learn how to process a large CSV file with historical weather records. The goal is to calculate the average daily temperature. For that purpose, we’ll consume a Kafka topic, transform the data, and send it to another topic. Moreover, you’ll get familiar with commonly used stream operations, such as aggregations.

Let’s get started!

Create the Project

Install and start Kafka

There are multiple ways to start Kafka. For example, you could use a Docker image. Here, I chose to install it locally.

  1. Download the latest Kafka release from here.
  2. Extract the package and navigate to the Kafka folder
$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0

3. Start Zookeeper in a terminal window, using this command:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

4. Start Kafka broker service from another terminal with this command:

$ bin/kafka-server-start.sh config/server.properties

Create the Kafka topics

In another terminal tab, create the input and output topics, as shown below:

$ bin/kafka-topics.sh — create — topic weather-tmp-input — bootstrap-server localhost:9092
bin/kafka-topics.sh — create — topic weather-tmp-output — bootstrap-server localhost:9092

Get the data set

Download the CSV containing the weather records. I got this data set from kaggle.com. You can also find the file in my GitHub repository linked in the “References” section.

Just to give you an idea, the records look like this:

Weather records sample

Write the Java code

  1. Create a new Maven project in your favourite IDE, and put the following dependencies in the pom.xml file:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.4</version>
</dependency>

2. Create the Weather.java class, as shown below:

  • We only need the date and the temperature from the CSV record.
  • We use a regex pattern, “\\s”, to extract the day from the date.

3. Create a new Java file called KafkaStreamsDemo.java, and copy the following code:

A step-by-step code explanation is shown below:

  • The createAvgTempCalcStream() method defines the stream that computes the average day temperature based on the CSV records.
  • First, we take the inputStream to map the file records to our custom Weather object. The Kafka map operation transforms the input record to a new record in the output stream. It also gives the ability to change the key/value types.
  • The mapLineToWeather() method constructs a new Weather object. We split each record line by a comma. We use the first and the fourth value to extract the date and the temperature.
  • The groupByKey operator groups the records in the stream by an existing key. This operation is required before we can perform an aggregation. It results in a KGroupStream. Note that null records are not included in the result.
  • Since the default Serdes do not match the key/value types, we must explicitly specify new Serdes. In this case - (with(Serdes.String(), Serdes.Double()). The date is a String, and the average temperature is a Double.
  • The aggregate operation allows the output value to have a different type than the input value. It aggregates the values by the grouped key.
  • The TempCalculator class helps us calculate the average daily temperature. We count each record for the same day and sum up the temperature. Then we divide the sum by the count to receive the average.
  • The stream is materialized into a KTable. We need a custom Serdes for our TempCalculator object. That’s why we have to create a JsonSerializer and JsonDeserializer to construct the CustomSerdes class.
  • Finally, we use the mapValues operation to transform the records into a new key-value pair. It retains the key of the original record. The day is a String, and the average temperature is a Double. We send the results to the output Kafka topic we created earlier.
  • Since we want to have the app running and listening for input topics, we use the CountDownLatch object. It allows more threads to wait until a set of operations being performed in other threads completes.

Run the app

  1. Run the Kafka program from your IDE.
  2. From your terminal, send the CSV file records to the input topic.
cat /home/{user}/kafka-playground/weatherHistory.csv | bin/kafka-console-producer.sh — broker-list localhost:9092 — topic weather-tmp-input

3. In another window, call the consumer, as shown below:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic weather-tmp-output \
        --from-beginning \
        --formatter kafka.tools.DefaultMessageFormatter \
        --property print.key=true \
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
        --property value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer

This will trigger our Java code. You should see an output like this below. Note that I’ve truncated the result for the sake of brevity.

......
2016–09–30 17.090740740740742
2016–09–04 22.15300925925926
2016–09–05 16.838657407407407
2016–09–06 17.289583333333336
2016–09–07 21.448379629629628
2016–09–08 22.427314814814817
2016–09–09 22.702546296296294
......

Great! The Kafka program processed the input topic and calculated the average daily temperature as expected.

Conclusion

In this tutorial, you learned how to use Kafka Streams operations such as aggregate, groupByKey. Also, you know how to create a custom Serdes to transform data for output topics.

I hope you’ve learned useful information from this article.

If you’re interested in more Java/Message processing topics, you might like my related article:

Thank you for reading, and happy coding!

References

Programming
Java
Software Development
Software Engineering
Web Development
Recommended from ReadMedium