Free AI web copilot to create summaries, insights and extended knowledge, download it at here
4421
Abstract
0<span class="hljs-tag"></<span class="hljs-name">version</span>></span>
<span class="hljs-tag"></<span class="hljs-name">dependency</span>></span>
<span class="hljs-tag"><<span class="hljs-name">dependency</span>></span>
<span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>com.fasterxml.jackson.core<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span>
<span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>jackson-databind<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span>
<span class="hljs-tag"><<span class="hljs-name">version</span>></span>2.12.4<span class="hljs-tag"></<span class="hljs-name">version</span>></span>
<span class="hljs-tag"></<span class="hljs-name">dependency</span>></span></pre></div><p id="3f7d">2. Create the <code>Weather.java</code> class, as shown below:</p>
<figure id="7a85">
<div>
<div>
<iframe class="gist-iframe" src="/gist/kirshiyin89/54c2de208cb755fa397b6d77a93b94d6.js" allowfullscreen="" frameborder="0" height="undefined" width="undefined">
</div>
</div>
</figure></iframe></div></div></figure><ul><li>We only need the date and the temperature from the CSV record.</li><li>We use a regex pattern, <code>“\\s”</code>, to extract the day from the date.</li></ul><p id="f97a">3. Create a new Java file called <code>KafkaStreamsDemo.java</code>, and copy the following code:</p>
<figure id="74ff">
<div>
<div>
<iframe class="gist-iframe" src="/gist/kirshiyin89/8d29a68dfe6e0bb5a29619d9ef06f40e.js" allowfullscreen="" frameborder="0" height="undefined" width="undefined">
</div>
</div>
</figure></iframe></div></div></figure><p id="cb8c">A step-by-step code explanation is shown below:</p><ul><li>The <code>createAvgTempCalcStream()</code> method defines the stream that computes the average day temperature based on the CSV records.</li><li>First, we take the <code>inputStream</code> to map the file records to our custom <code>Weather</code> object. The Kafka <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-">map</a> operation transforms the input record to a new record in the output stream. It also gives the ability to change the key/value types.</li><li>The <code>mapLineToWeather()</code> method constructs a new <code>Weather</code> object. We split each record line by a comma. We use the first and the fourth value to extract the date and the temperature.</li><li>The <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey--"><code>groupBy</code>Key</a> operator groups the records in the stream by an existing key. This operation is required before we can perform an aggregation. It results in a <code>KGroupStream</code>. Note that <code>null </code>records are not included in the result.</li><li>Since the default Serdes do not match the key/value types, we must explicitly specify new Serdes. In this case - <code>(with(Serdes.String(), Serdes.Double())</code>. The date is a String, and the average temperature is a Double.</li><li>The <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Merger-org.apache.kafka.streams.kstream.Materialized-"><code>aggreg</code>ate</a> operation allows the output value to have a different type than the input value. It aggregates the values by the grouped key.</li><li>The <code>TempCalculator</code> class helps us calculate the average daily temperature. We count each record for the same day and sum up the temperature. Then we divide the <code>sum</code> by the <code>count</code> to receive the average.</li><li>The stream is materialized into a <code>KTable</code>. We need a custom Serdes for our <code>TempCalculator</code> object. That’s why we have to create a <code>JsonSerializer</code> and <code>JsonDeserializer</code> to construct the <code>CustomSerdes</code> class.</li><li>Finally, we use the <a href="https://kafka.apache.org/11/java
Options
doc/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-"><code>mapVal</code>ues</a> operation to transform the records into a new key-value pair. It retains the key of the original record. The day is a String, and the average temperature is a Double. We send the results to the output Kafka topic we created earlier.</li><li>Since we want to have the app running and listening for input topics, we use the <code>CountDownLatch</code> object. It allows more threads to wait until a set of operations being performed in other threads completes.</li></ul><h2 id="de88">Run the app</h2><ol><li>Run the Kafka program from your IDE.</li><li>From your terminal, send the CSV file records to the input topic.</li></ol><div id="e67f"><pre><span class="hljs-built_in">cat</span> /home/{user}/kafka-playground/weatherHistory.csv | bin/kafka-console-producer.sh — broker-list localhost:9092 — topic weather-tmp-input</pre></div><p id="3298">3. In another window, call the consumer, as shown below:</p><div id="ad2f"><pre>./bin/kafka-console-consumer<span class="hljs-selector-class">.sh</span> <span class="hljs-attr">--bootstrap-server</span> localhost:<span class="hljs-number">9092</span>
<span class="hljs-attr">--topic</span> weather-tmp-output
<span class="hljs-attr">--from-beginning</span>
<span class="hljs-attr">--formatter</span> kafka<span class="hljs-selector-class">.tools</span><span class="hljs-selector-class">.DefaultMessageFormatter</span>
<span class="hljs-attr">--property</span> print.key=true
<span class="hljs-attr">--property</span> key.deserializer=org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.kafka</span><span class="hljs-selector-class">.common</span><span class="hljs-selector-class">.serialization</span><span class="hljs-selector-class">.StringDeserializer</span>
<span class="hljs-attr">--property</span> value.deserializer=org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.kafka</span><span class="hljs-selector-class">.common</span><span class="hljs-selector-class">.serialization</span>.DoubleDeserializer</pre></div><p id="2f95">This will trigger our Java code. You should see an output like this below. Note that I’ve truncated the result for the sake of brevity.</p><div id="b377"><pre><span class="hljs-code">......
2016–09–30 17.090740740740742
2016–09–04 22.15300925925926
2016–09–05 16.838657407407407
2016–09–06 17.289583333333336
2016–09–07 21.448379629629628
2016–09–08 22.427314814814817
2016–09–09 22.702546296296294
......</span></pre></div><p id="64c9">Great! The Kafka program processed the input topic and calculated the average daily temperature as expected.</p><h1 id="143a">Conclusion</h1><p id="1bb2">In this tutorial, you learned how to use Kafka Streams operations such as <code>aggregate</code>, <code>groupByKey</code>. Also, you know how to create a custom Serdes to transform data for output topics.</p><p id="f8d6">I hope you’ve learned useful information from this article.</p><p id="cf0e">If you’re interested in more Java/Message processing topics, you might like my related article:</p><div id="0e82" class="link-block">
<a href="https://betterprogramming.pub/java-process-messages-from-rabbitmq-and-upload-data-to-minio-cloud-b70ecd2e82be">
<div>
<div>
<h2>Java: Process Messages From RabbitMQ and Upload Data to MinIO Cloud</h2>
<div><h3>Consume messages from RabbitMQ and upload epub files to MinIO Cloud Storage</h3></div>
<div><p>betterprogramming.pub</p></div>
</div>
<div>
<div style="background-image: url(https://miro.readmedium.com/v2/resize:fit:320/0*0jSTjVOmXsTzyswF)"></div>
</div>
</div>
</a>
</div><p id="9763">Thank you for reading, and happy coding!</p><h1 id="e70c">References</h1><ul><li><a href="https://github.com/kirshiyin89/kafka-streams-demo">GitHub Repo</a></li><li><a href="https://kafka.apache.org/quickstart">https://kafka.apache.org/quickstart</a></li><li><a href="https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#aggregating">https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#aggregating</a></li></ul></article></body>