The context discusses using PostgreSQL's Listen/Notify mechanism to implement a publish/subscribe (pub/sub) pattern for Go-based microservices, enabling efficient communication between two microservices built using the Go programming language.
Abstract
The context focuses on utilizing PostgreSQL's Listen/Notify mechanism to implement a pub/sub pattern for Go-based microservices, allowing efficient communication between them. It explains the importance of the pub/sub messaging pattern in microservice-oriented architectures and how PostgreSQL's Listen/Notify commands enable the implementation of this pattern within a PostgreSQL database environment. The advantages of using PostgreSQL as a message broker are highlighted, such as no additional complexity and ensuring atomicity of transactions. The context also demonstrates building a simple microservice application for data processing using the Listen/Notify mechanism.
Bullet points
The context discusses using PostgreSQL's Listen/Notify mechanism to implement a pub/sub pattern for Go-based microservices.
The pub/sub messaging pattern is important in microservice-oriented architectures for asynchronous communication between services.
PostgreSQL's Listen/Notify commands enable the implementation of the pub/sub pattern within a PostgreSQL database environment.
Using PostgreSQL as a message broker has advantages, such as no additional complexity and ensuring atomicity of transactions.
The context demonstrates building a simple microservice application for data processing using the Listen/Notify mechanism.
PostgreSQL as message broker for Go applications
Using PostgreSQL LISTEN/NOTIFY mechanism to implement a pub/sub pattern for Go-based microservices.
In microservice-oriented architectures, seamless communication between services stands as a cornerstone for ensuring the coherent behavior of the entire application. Among the myriad messaging patterns available, the Publish/Subscribe (Pub/Sub) paradigm emerges as a focal point of interest within the scope of this article. Herein, we delve into the implementation of this pattern through the utilization of the PostgreSQL Listen/Notify mechanism. By harnessing this mechanism, we enable efficient communication between two microservices, both built using the Go programming language.
Pub/Sub messaging pattern
Pub/Sub stands as an architectural design pattern integral to distributed systems, facilitating asynchronous communication between disparate components or services. Within this paradigm, the publisher, also known as the producer, generates an event each time a predefined scenario unfolds. These events adhere to a standardized API. Subsequently, various services, referred to as subscribers or consumers, express interest in receiving notifications pertaining to these events by subscribing to them. Consequently, whenever the producer generates an event, each subscribed consumer receives a duplicate instance of the event. This inter-service communication typically transpires through an intermediary service known as a message broker. The essence of this pattern is succinctly captured in the illustrative depiction originally disseminated by Microsoft, as shown below.
While Apache Kafka and RabbitMQ represent quintessential examples of message brokers in the realm of distributed systems, it’s worth noting that alternative solutions also exist. One such solution, particularly noteworthy in the context of PostgreSQL users, is the database’s built-in Listen/Notify mechanism.
Intro to Listen/Notify in PostgreSQL
Listen and Notify are PostgreSQL commands that empower the implementation of a Pub/Sub pattern directly within a PostgreSQL database environment. In this architectural design, the producer component leverages the Notify command to dispatch notifications to a designated channel, optionally accompanied by a payload. Conversely, the consumer component utilizes the Listen command to register interest in specific channels and commence receipt of corresponding notifications.
The Listencommand has the following signature
LISTEN channel
where channel is any valid SQL identifier.
The Notifycommand is more elaborate and has the following signature
NOTIFY channel [ , payload ]
where channel is again any valid SQL identifier and payload is an optional string message, which will be delivered to each consumer.
Another method to send messages involves utilizing the helper function pg_notify(text, text), where the initial argument denotes the channel name and the subsequent one signifies the payload destined for delivery to consumers.
In comparison to other message brokers like RabbitMQ, the maximal payload size is significantly smaller — in default configuration the maximal payload size is 8000 bytes for PostgreSQL and 2GB for RabbitMQ (using AMQP protocol).
It is important to mention, that the Listen/Notify mechanism does not provide any out-of-the-box support for persistent subscribers. Notifications sent before a client starts listening to messages will be lost.
However, using PostgreSQL as a message broker comes with two great advantages:
no additional complexity —given the utilization of PostgreSQL as the primary database solution, the integration of additional services becomes unnecessary as the requisite functionality is inherently available within PostgreSQL itself.
Ensure atomicity of transaction — in a typical microservice architecture, the workflow of a service often involves consuming notifications, processing data, saving results to a database, and notifying downstream services. However, ensuring the atomicity of the last two steps — saving data and notifying downstream services — can present a challenge, particularly when utilizing separate services for database and message broker functionalities. While patterns like the outbox pattern exist to mitigate the risks associated with non-atomic operations, the ideal solution lies in minimizing the complexity arising from communication between disparate services. By leveraging Listen/Notify, the system consolidates dependencies, thereby reducing complexity. Notably, the Notify command offers a crucial feature: notifications issued within a transaction remain pending until the transaction is committed, ensuring atomicity.
Data processing application
Now, that we have an understanding of the PostgreSQL Listen/Notifymechanism, let’s use it in practice to build a simple microservice application for data processing. It will consist of three main elements:
data producer — a Go-based service, that will expose the HTTP API with a POST endpoint, which when notified will generate random data and save it to the table,
data consumer — a Go-based service that will listen for notifications about new data and process them (for brevity, processing means logging them to the console),
PostgreSQL database — it will serve as a database to store the data and as a message broker to communicate the data producer with a data consumer.
In both data producer and data consumer we will use de>pgx as the PostgreSQL driver for Go parts. The pgx driver is a low-level, high-performance interface that except for including an adapter for the standard database/sql interface, exposes PostgreSQL-specific features such as Listen/Notify.
Data producer
Let’s start with the data producer. As stated above, it is a straightforward HTTP server that exposes one endpoint ( POST /ingest), generates random data, and saves them to the database. Although, in practical scenarios, data may be parsed from the request body, for the sake of simplicity in this illustration, we’ll adhere to random data generation.
With regards to the standard configuration, which involves initializing the context (line 18), setting a read header timeout (line 35), configuring the endpoint (line 30), and subsequently launching the server (line 39), the crux of the functionality lies within the createIngestHandler function.
This function creates a closure over the pgxpool.Pool struct, which is then employed as an http.HandlerFunc within the HTTP server. The pgxpool.Pool is a collection (or pool) of connections available for the application’s utilization. pgxpool.Pool is thread-safe, making it suitable for deployment within concurrent applications.
At line 69, the insertion operation into the “signals” table is executed, incorporating the generated data. While the exact schema and database setup will be presented subsequently, it’s imperative to highlight that upon successful insertion, a status code of 200 OK is returned, accompanied by information pertaining to the generated data. Conversely, in the event of an unsuccessful operation, a status code of 500 Internal Server Error is dispatched, along with a descriptive error message.
The establishment of the connection (or more precisely, the pool of connections) to the database is performed within the connectDB function. Here, the pgxpool.New function is invoked to initiate the connection to the database based on the provided connection string. The connection string itself is sourced from an environment variable, thereby adhering to the principles outlined in The Twelve-Factor App.
Let’s leverage the Docker to build a containerized version of the data producer.
We use a multistage build to build and scratch base image to build a very light Docker image. In line 7, we use a trick with mounting a local file system into the Docker filesystem for build time. Thanks to this, there is no need to copy the go.modand go.sumfiles into the build context.
If you are interested in knowing more about building minimal Go Docker images, check out my other article!
Database
Let’s now turn our attention to database setup.
To simplify and speed up the setup process, we will use the PostgreSQL database in the Docker container. The entire application and especially the database will be run using a docker-compose orchestrator. The section related to PostgreSQL looks as follows
First, we define the image to be used — in our case postgres:15, then the .env file with environment variables is provided to the runtime. The .env file contains 3 Postgres-specific variables and 1 (DATABASE_URL) which will be later on used by the data producer and consumer.
Variables POSTGRES_USER, POSTGRES_PASSWORD, and POSTGRES_DBallow initialization respectively a user, password for this user, and database in a containerized PostgreSQL database. The full list of available variables can be found here.
After providing a correct environment, there are two specific volumes mounted to the container. The first one — “./db:/init_queries” — will contain SQL scripts that allow us to configure further the database on startup and especially to create the tables, functions, and triggers. The second one — ”./db/initdb.sh:/docker-entrypoint-initdb.d/initdb.sh” — mounts the initdb.shscript was written by us to the special Docker location — docker-entrypoint-initdb.d. Every bash script that will be placed here, will be run at container start-up. Let’s now take a closer look at two scripts to be placed in inti_queries folder.
The first one creates the table signals, which will be used to store the generated data. It has a very straightforward schema — the timestamp, when the data was generated, the signal value, and the signal name.
The second one creates the function fn_new_signals — this function is responsible for parsing the newly inserted record into JSON string and then using the pg_notify function to send the notification. The channel to which the notification is sent is called new_signals, however, it can be called in any other way. Then, when the function is ready, the trigger is attached to the table signals — after every insert into the table, the function is called and a notification is sent. In other words, after inserting every new record into the table, the record is parsed into JSON string and the notification is sent with the payload being the new record in JSON format.
Finally, in the database-related section of docker-compose.yml file, we set up a healthcheck for PostgreSQL. This will help us correctly orchestrate the startup order of services.
Data consumer
There is one piece missing — the data consumer. Before jumping into the code, let’s first think about what needs to happen to consume the db notification. We need:
create a database connection,
issue Listencommand to the database,
wait for notification to arrive,
parse the notification payload,
pass the payload for consumption.
It is important to mention, that in pgx implementation, the waiting for notification operation (implemented by WaitForNotification) is a blocking operation — it means that until a notification arrives, the code will be “stuck” on this method. In our simplified service implementation it is not a problem — processing the notification is extremely fast and simple, we merely log the payload to the console. But in real-world scenarios, one will typically spawn a separate goroutine responsible for waiting for notifications and separate goroutines for processing logic.
We will wrap the waiting and passing the payload logic into a separate struct Listener. It will accept as an argument a database connection pool and a variadic number of callbacks to call on the parsed payload and it will have one public method — Start — which will accept a context.Context and which will make Listener wait for notification. It will be a blocking method as mentioned above.
But let’s start with defining the type for representing the db notification. As we remember from the database configuration, after each table insert, we convert a new record consisting of 3 columns into a JSON string and pass it as a payload. So to correctly represent the payload, we will use a struct with 3 fields.
To correctly handle parsing the PostgreSQL timestamp into Go time.Time, we overwrite the default UnmarshallJSONmethod attached to a struct. The important part consists of lines 26–32, where we first try to parse the timestamp using the RFC3339 format with nanoseconds, if it fails we try to parse using the RFC3339 format without nanoseconds.
Let’s now define the Listenerstruct and its constructor.
First, we define the type for the callback function accepted by Listener — it is any function that accepts one argument of type dbNotification and returns nothing. Then, we define the Listenertype itself. It will keep the reference to the provided connection pool (pool field of type *pgxpool.Pool), the name of the channel to listen to, and an array of callbacks to call on the notification payload. The constructor is very straightforward, it just accepts the connection pool and a variadic number of callbacks and returns a pointer to the Listenerstruct. The channel name is not passed as an argument, because in the final code, it is made as a global constant.
The main Listener functionality is contained in the function Startpresented below. There are also two additional helper functions to parse the payload from JSON string to Go struct and handle the notification.
Let’s take a closer look at the Start function.
First, in line 5 we acquire a connection from the pool. In other words, we select a specific connection, on which we will receive the notification and this particular TCP connection should be used by PostgreSQL to send them. Then, if acquiring the connection is successful, we defer releasing the connection in line 10, i.e. returning it to the pool — regardless of what happened we don’t want to leak the resources! Next, in line 13, we let the database know, that we want to receive the notification from l.channelName on this connection. Finally, if all previous operations were successful, we start an infinite for loop in which we first wait for notification to arrive (line 20) and then process it (line 24). As was mentioned previously, waiting for notification is a blocking operation in pgx, so we do not have to worry, that the loop will run iterate pointlessly. Calling the Conn method in line 20 transforms the connection from *pgxpool.Connto *pgx.Conn.
A quick look at the handleNotification function reveals, that all it does is parse the payload from JSON to dbNotification struct and then call all the provided callbacks on it.
As we have already the Listener type, let’s now run it and create the main function for the data consumer.
First, we create a context and defer it canceling in lines 16 and 17. In line 19 we obtain the connection pool using the same function connectDBas in data producer to connect to PostgreSQL. Finally, in line 24 we create the Listenerinstance and in line 25 it’s time to start consuming the notifications. To the Listenerconstructor, we provide one callback function, which is defined in lines 42–44 — all it does is log the payload to the console, so we will be able to confirm that our mechanism works as expected.
Similarly, we pack the data consumer into a Docker image. The Dockerfile is very similar to the Dockerfile building the image for data producer, except exposing the port and the fact, that the build command must take all the files and not only the main.gofile.
Orchestrating the application
Finally, it’s time to take all the pieces together and run our application.
The exact file structure is as follows.
The only piece we are missing right now is the full docker-compose.ymlfile.
The section related to the database was already discussed. The part related to the data producer consists of 4 directives:
the correct build context is specified according to the presented file structure,
the dependency on the database is specified — Docker will wait to create the data producer container until the database is ready to accept connections,
the environment variables are provided via .env file,
port is exposed from the container to the host for us to trigger the Pub/Sub mechanism.
Similarly, the data consumer section consists of 4 directives:
the correct build context is specified according to the presented file structure,
the dependency on the database is specified — Docker will wait to create the data producer container until the database is ready to accept connections,
the environment variables are provided via .env file,
the deployment specification is provided — Docker will run 2 copies of the services. This will allow us to prove, that both of the copies (imitating different consumers) will receive the same notification.
The presented docker-compose.ymlis not very elaborated, for example, it does not specify the healthchecks for the data producer and consumer, but should be enough to test the entire application.
Testing
It’s time to run and test the application!
In a terminal, from the location where docker-compose.ymlfile is placed run
docker compose up -d --build
This command will build and run the application in detached mode. If everything went correctly, you should see the following output
⠦ Network pubsub_default Created 11.5s
✔ Container pubsub-db-1 Healthy 10.8s
✔ Container pubsub-consumer-2 Started 11.3s
✔ Container pubsub-consumer-1 Started 11.1s
✔ Container pubsub-producer-1 Started 11.1s
The start-up time is related to the start_period of the database service specified in docker-compose.yml.
Running docker compose logs producerand docker compose logs consumershould indicate that both the data producer and two copies of the data consumer are ready to work.
Let’s now trigger the data producer to insert data into the table. This operation should trigger the notification, which should be delivered to both copies of the data consumer and a proper log should be produced. Using the curl tool run the following command
curl -X POST http://localhost:8080/ingest
Now, the command docker compose logs producershould show output similar to this
producer-1 | 2024/04/02 16:23:17 INFO Inserting data into database "signal name"=test_signal "signal value"=0.32082221112355913
A new data point was correctly created and inserted into the table.
Let’s now check the crucial part — logs from the data consumers.
docker compose logs consumer
consumer-1 | 2024/04/02 16:16:18 INFO Starting listener
consumer-2 | 2024/04/02 16:16:18 INFO Starting listener
consumer-1 | 2024/04/02 16:23:17 INFO notification received: 2024-04-02 16:23:17 +0000 UTC-test_signal-0.320822
consumer-2 | 2024/04/02 16:23:17 INFO notification received: 2024-04-02 16:23:17 +0000 UTC-test_signal-0.320822
Congratulation! The notification was correctly created and delivered to both copies of the data consumer.
Conclusion
The integration of PostgreSQL Listen/Notify presents a robust mechanism, facilitating the transformation of a standard PostgreSQL database into a proficient message broker, operating within the paradigm of a publisher/subscriber architecture. Leveraging this capability not only harnesses the inherent power of PostgreSQL but also obviates the need for introducing additional dependencies into the project ecosystem.