Building CQRS Views with Debezium, Kafka, Materialize, and Apache Pinot — Part 2
How to build an incrementally updated materialized view that serves queries in a faster and scalable manner?
This post is the second and final installment of the article series that explores building a read-optimized view with Debezium, Kafka, Materialize, and Apache Pinot.
Part 1 discussed the problem space of building an online pizza order tracker and a possible solution architecture. Part 2 will walk you through the solution implementation in detail.

Check out part 1 from here if you haven’t already.
Step 1: Event-enabling Microservices data
Our first step is to capture data from the above Microservices as they are produced and move them to a place where we can build a materialized view. I’ve omitted the DeliveryService in the project to have lesser moving parts and make it simple to understand.
We have orders and order items in the OrderService and order status updates in the KitchenService. Let’s move these data out first.
Streaming orders from OrderService with Debezium
Assume that the OrderService has a MySQL database called pizzashop with two tables; the orders and order_items. They represent the one-to-many relationship between pizza orders and their order items.

When you start the Docker stack, the MySQL container automatically creates the above database with some mock order records. You can find the seed script in <project_root>/mysql folder.
When the OrderService creates or updates orders in these tables, we can use Debezium to stream them to Kafka as change events. So, let’s register a Debezium MySQL connector to enable that.
Type the following in a terminal.
curl -H 'Content-Type: application/json' localhost:8083/connectors --data '
{
"name":"orders-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"mysql",
"database.port":"3306",
"database.user":"debezium",
"database.password":"dbz",
"database.server.id":"184054",
"database.server.name":"mysql",
"database.include.list":"pizzashop",
"database.history.kafka.bootstrap.servers":"kafka:9092",
"database.history.kafka.topic":"mysql-history"
}
}'Once the command returns, you should see two Kafka topics created, mysql.pizzashop.orders and mysql.pizzashop.order_items, containing change event streams from orders and order_items tables, respectively. Initially, Debezium will take snapshots from both tables and streams them to those topics.
Capturing order status changes from KitchenService
Let’s assume the KitchenService directly produces an event similar to this into a Kafka topic when the order status transitions across different states.
{
"id":"1",
"order_id":1,
"status":"CREATED",
"updated_at":1453535342
}Publish some sample events to the order_updates Kafka topic by running:
kcat -b localhost:29092 -t order_updates -T -P -l data/updates.txtI’ve used the kcat tool here. But feel free to use any other tool. The order_id of these events matches with the orders that came from MySQL, allowing us to join them later.
Step 2: Building the order_summary materialized view.
After completing Step 1, you should see three topics created in Kafka.

This step aims to build a materialized view by joining the events streaming from these topics together and keeping it updated incrementally as new events arrive. We will use Materialize for that.
Materialize is a streaming database designed to build data-intensive applications and services in SQL — without pipelines or caches. Materialize works based on the Timely Dataflow paper, which is instrumental in maintaining an incrementally updated materialized view.
You can check out my previous article on Materialize for more information.
Define sources and views
We will create three sources to represent the above event streams inside Materialize.
Our Docker stack already contains a running Materialize container. You can connect to it with Materialize CLI or psql (yes, Materialize is wire compatible with Postgres)
psql -U materialize -h localhost -p 6875 materializeNow that you’re in the Materialize CLI, define all of the tables in the pizzashop database as Kafka sources:
CREATE SOURCE orders
FROM KAFKA BROKER 'kafka:9092' TOPIC 'mysql.pizzashop.orders'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE DEBEZIUM;CREATE SOURCE items
FROM KAFKA BROKER 'kafka:9092' TOPIC 'mysql.pizzashop.order_items'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE DEBEZIUM;CREATE SOURCE updates_source
FROM KAFKA BROKER 'kafka:9092' TOPIC 'order_updates'
FORMAT BYTES;The CDC streams written by Debezium are Avro formatted by default, and it uses the Confluent Schema Registry container included in the Docker stack. The above sources pull message schema data from the registry and materialize the column types for each attribute.
We’ll also want to create a JSON-formatted source to represent the order_updates topic.
With JSON-formatted messages, we don’t know the schema, so the JSON is pulled in as raw bytes, and we still need to CAST data into the proper columns and types. For that, we will create the following view.
CREATE MATERIALIZED VIEW updates AS
SELECT
(data->>'id')::int AS id,
(data->>'order_id')::int AS order_id,
data->>'status' AS status,
data->>'updated_at' AS updated_at
FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM updates_source);This view is not materialized and doesn’t store the query results but provides an alias for the embedded SELECT statement and allows us to shape the status update data into the format we need.
Type show sources and show views to verify the sources and views created so far.
materialize=> show sources;
name
----------------
items
orders
updates_source
(3 rows)materialize=> show views;
name
---------
updates
(1 row)Define the order_summary materialized view
Next, we will join the above streams based on the order_id to create the order_summary materialized view. This view is enriched with all information necessary to populate the UI.
CREATE MATERIALIZED VIEW order_summary AS
SELECT
orders.order_id AS order_id,
orders.total AS total,
orders.created_at as created_at,
array_agg(distinct concat( items.name,'|',items.quantity)) as items,
array_agg(distinct concat( updates.status,'|',updates.updated_at)) as status
FROM orders
JOIN items ON orders.order_id=items.order_id
JOIN updates ON orders.order_id=updates.order_id
GROUP BY orders.order_id, orders.created_at, orders.total;The orders, order items, and status updates have different cardinalities. For example, a single event can have many order items and status updates. So, we can model the items and updates as arrays with the array_agg() Postgres function. That way, we can fit all related information of an order into a single row in the view.
Running the following query will show you the content of the view. It has already been populated with the existing data.
materialize=> select * from order_summary;Results in:

Define the Kafka sink
The order_summary materialized view keeps on updating as new events come. Changes applied to each row in the view will emit a changelog event containing the latest version of the row. We can capture these events as a CDC stream and move them to Kafka with a sink.
CREATE SINK results
FROM order_summary
INTO KAFKA BROKER 'kafka:9092' TOPIC 'orders_enriched'
CONSISTENCY TOPIC 'orders_enriched-consistency'
CONSISTENCY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
WITH (reuse_topic=true)
FORMAT JSON;When you create a Sink, Materialize streams out all changes to the view from the time of sink creation onwards.
Remember, Materialized views are maintained in memory. If you restart Materialize and use a basic sink, you will stream duplicate change events as Materialize re-ingests the upstream data and re-computes the view. To ensure users don’t unknowingly push duplicate events, Materialize creates new, distinct topics for sinks after each restart.
We can avoid these potential issues with the reuse_topic option.
To enable the reuse of the existing topic, you must use the reuse_topic option. Also, you can specify the name of a consistency topic to store the information that Materialize will use to identify the last completed write. The name of the consistency topic is provided with the CONSISTENCY TOPIC parameter.
Since this is a JSON-formatted sink, you must specify that the consistency topic format is AVRO. That is done through the CONSISTENCY FORMAT parameter and a pointer to the schema registry URL.
Run the following query to see the Kafka topic created by the sink. Note down its name as we will need it in the next step.
SELECT sink_id, name, topic
FROM mz_sinks
JOIN mz_kafka_sinks ON mz_sinks.id = mz_kafka_sinks.sink_id;
Step 3: Serving the enriched materialized view
Our work is not done yet. But we just finished the tricky part. What remains is to ingest the materialized view’s CDC stream into Apache Pinot, allowing the UI to query it fast at higher throughputs.
Why Apache Pinot?
We could’ve directly served the enriched materialized view to the UI. But this use case is different. A pizza order tracker is Internet-facing, allowing millions of concurrent users to access it, and the results must be shown in the UI within a few milliseconds.
To meet that demand, we will use Apache Pinot, a real-time OLAP database capable of ingesting streaming data and making them available for fast and scalable querying.
You can check out my previous article to learn more about Pinot.
Ingesting nested JSON objects and multi-value fields
The CDC stream coming to Kafka from Materialized consists of JSON-formatted events. The following represents a single event, which includes deeply nested structures and JSON arrays.
{
"before":null,
"after":{
"row":{
"order_id":1,
"total":50.0,
"created_at":"1660194991000",
"items":[
"Chicken BBQ|1",
"Sri Lankan Spicy Chicken Pizza|1"
],
"status":[
"CREATED|1453535342",
"PROCESSING|1453535345"
]
}
},
"transaction":{
"id":"1660196614999"
}
}We can configure Pinot to unnest the JSON structure during ingestion. Also, we can use multi-value columns to store order items and status updates of a single order.
For more information on ingesting complex JSON objects into Pinot, you can watch the following video by my good friend Mark Needham.







