Site icon JVM Advent

Using Postgres as a Message Queue

Databases are no message queues is a well-established claim that has been discussed in many blog postings and conference presentations. But with advancements in relational databases, does this claim still stand up to scrutiny? Looking at modern versions of Postgres, the answer is often no. Therefore, this article looks into Postgres’ lightweight notification mechanism and discusses how it can be leveraged to implement a simple, but effective push-based message queue. It also looks into using this queue for communicating among replicas on a Kubernetes deployment, and into implementing a generic task processing framework.

Postgres as a message queue

Postgres is, of course, a relational database that implements a large fraction of the SQL standard. But beyond that, Postgres implements many other, non-standardized features that can also be executed via its extension upon SQL. One such feature is the LISTEN and NOTIFY mechanism which allows for sending asynchronous messages across database connections. And of course these commands can be issued via JDBC. For a simple hello-world example, consider a JVM to listen on a given hello_world_channel:

try (Connection conn = getConnection()) {

  try (Statement stmt = conn.createStatement()) {
    stmt.execute(“LISTEN hello_world_channel");
  }

  PGNotification[] notifications =  conn
    .unwrap(PgConnection.class)
    .getNotifications(0);

  System.out.println(
    "Hello " + notifications[0].getParameter() + "!");
}

To receive notifications, one needs to specify the name of a channel to LISTEN to. The name of the channel can be chosen arbitrarily. To receive notifications, one needs to unwrap the connection to the Postgres JDBC driver’s PgConnection. From there, received notifications can be read with a timeout, or 0 if one wants to wait indefinitely. A second JVM can now send a notification using a similarly simple setup:

try (
    Connection conn = getConnection(); 
    Statement stmt = conn.createStatement()) {
  stmt.execute("NOTIFY hello_world_channel, ‘World’");
}

which will cause the first JVM to print Hello World!.

Defining triggers to create a simple message queue

Often, a notification is not sent directly, but via a trigger on a table. For example, to implement the mentioned message queue, one could start basis in a simple table as:

CREATE TABLE MY_MESSAGES (
  RECEIVER VARCHAR(200),
  ID SERIAL,
  PAYLOAD JSON,
  PROCESSED BOOLEAN);

To fire a notification whenever a message is inserted into the table, a function such as the following implements this in Postgres’ procedural language pgSQL without altering the inserted row:

CREATE FUNCTION MY_MESSAGES_FCT()
RETURNS TRIGGER AS
$BODY$
BEGIN
  PERFORM pg_notify(‘my_message_queue’, NEW.RECEIVER);
  RETURN NEW;
END;
$BODY$
LANGUAGE PLPGSQL;

In the above function, the pg_notify function is invoked, which simply triggers a NOTIFY with the second argument as a payload but avoids possible SQL injection which could occur with string concatenation. This function can now be installed as a trigger on any insertions in MY_MESSAGES:

CREATE TRIGGER MY_MESSAGES_TRG
AFTER INSERT ON MY_MESSAGES
FOR EACH ROW
EXECUTE PROCEDURE MY_MESSAGES_FCT();

This way, one or several listeners can be notified on the arrival of new messages, for example as replicas within a Kubernetes deployment.

Postgres notifications and connection pooling

One caveat with Postgres’ notification mechanism is that it typically requires the creation of a dedicated Connection for receiving notifications. This is due to the connection being used for sending notifications back via the channel that the JDBC client established when opening a connection and executing the LISTEN statement. This requires that the connection is long-lived, which does not normally play well with pooled DataSources. Instead, one should create a dedicated Connection via the DriverManager API.

Note that this also occupies a full connection on the Postgres server where connections typically are pooled, as well. For this reason, a Postgres server might start rejecting new connection attempts if too many JVMs already occupy a dedicated connection for listening for notifications. It might therefore become necessary to increase the maximum number of allowed concurrent connections in the Postgres server instance. As connections for receiving notifications do often run idle and require few machine resources, this is not normally a consequential change. Quite the opposite, if the listening for notifications can substitute frequent polling against the database, this approach might even free resources.

With this downside, the approach of Postgres does also bring a less obvious upside. With Oracle, for example, the database does not require a dedicated connection. However, this requires that the database can actively call the notified application on a given host and port. This might not always be possible, for example on Kubernetes when multiple replicas share a common host.

Using Spring integration’s JDBC message queue on Postgres

This functionality will be available in Spring integration with imminent arrival of version six. Spring integration does already offer a JDBC-based queue implementation. But as of today, it only offers polling messages, or to receive push messages when operating on the same queue object within a single JVM. By defining a trigger, similarly to the one above, as suggested in Spring integration’s schema-postgres.sql file, Spring integration allows for receiving messages that are sent via a regular JdbcChannelMessageStore.

The message allows to send a message with any serializable payload to a given channel as follows:

JdbcChannelMessageStore messageStore = 
  new JdbcChannelMessageStore(getDataSource());
messageStore.setChannelMessageStoreQueryProvider(
  new PostgresChannelMessageStoreQueryProvider());

messageStore.addMessageToGroup(
  “some-channel”, 
  new GenericMessage<>(“World”);

which Spring integration 6 now allows to receive via push notification from any other connected JVM via:

PostgresChannelMessageTableSubscriber subscriber = 
  new PostgresChannelMessageTableSubscriber(() -> 
    DriverManager.getConnection(
      getJdbcUrl(), 
      getUsername(), 
      getPassword()).unwrap(PgConnection.class);
subscriber.start()

PostgresSubscribableChannel channel = 
  new PostgresSubscribableChannel(
    messageStore, 
    "some-channel",
     subscriber);
channel.subscribe(message -> System.out.println(
  “Hello “ + message.getPayload() + “!”);

Before, inter-JVM communication like this was previously only possible by polling the channel for new messages while the above mechanism allows for quasi-instant communication among different VMs. When creating a multi-node application that already uses Postgres, this can be used as an easy way to communicate between VMs. For example, one could use Spring integration’s LockRegistryLeaderInitiator to determine a node that executes unshared work. If multiple nodes can receive an HTTP message that is meant for this leader node to process, those nodes can now forward this call via a JDBC message store which notifies the leader instantaneously. This can be achieved with only a few lines of code and without a need to expand the technical stack to additional technology such as Zookeeper.

Implementing a generic task processor with push notifications to workers

For a real-world example of inter-JVM communication using Postgres, the Norwegian tax authority offers a thin library for generic task processing using the database’s notification API. If a batch of new tasks is created, multiple worker nodes are notified of the additional work and wake up to poll for new messages. This work will continue until no additional tasks are available when the workers will get back to sleep.

This shows another strength of the notification mechanism where it allows any amount of listeners to a given channel to be notified simultaneously and without preallocating table rows to a given node. Thanks to Postgres’ multiversion concurrency control, this allocation can be decided upon selecting from a database where each node can acquire row locks to determine its tasks from a table, without the need for a separate allocation implementation within a possible alternative notification framework. All this makes Postgres a good choice for using the database as a queue, especially if Postgres already is part of the technological stack.

Author: Rafael Winterhalter

Rafael works as a software consultant in Oslo, Norway. He is a proponent of static typing and a JVM enthusiast with particular interest in code instrumentation, concurrency and functional programming. Rafael blogs about software development, regularly presents at conferences and was pronounced a JavaOne Rock Star. When coding outside of his work place, he contributes to a wide range of open source projects and often works on Byte Buddy, a library for simple runtime code generation for the Java virtual machine. For his work, Rafael received a Duke’s Choice award, an Oracle groundbreaker award and was elected a Java Champion.
Exit mobile version