Tobias T–

Get back to Howtos I create to explain things

PostgreSQL live streaming options

In a recent project I needed access to data inside a PostgreSQL database as soon as is was inserted into a database table. My initial ideas to solve the problem have been quite naive. Luckily a tip from a friend pointed me into a better direction and reading the manual changed things for the better.

The problem

We are living in a fast world. Everything needs to be delivered right on time. Data is no different and everyone wants to consume it in realtime. In this project multiple separate processes collect different metrics and stores them inside a PostgreSQL database. Along with other information it can be evaluated to produce complex reports. This all works really great on demand. A user will execute a SQL query and get the report that is needed. The database will evaluate this huge amount of records and return the result. That’s what a relational database is made for, and that is what it is good at.

The new feature should now provide realtime access to the information that was just inserted. The idea is to display it for example inside a browser and provide the information as a realtime stream. To make the problem description complete: new data can arrive two times per hour, but also more then 3000 records a second. The number of clients is more or less not defined. Let's just say it can grow. It can be just one, but it can also be more then multiple thousand subscribers to this stream of information at the same time.

Naive solutions

The easier the solutions, the faster the implemenation.

Polling

The immediate idea might to be send a SELECT query to the database every X milliseconds and check if new data is available.

The problem with this approach is that it does not scale very well. When you send that many queries to a huge database table, no index in the world will save you from the performance overhead this will require. More importantly this is just checking for changes. If nothing did change, the database will still have to perform the query to find out that nothing did change indeed.

Putting more pressure on an already busy database by looking for changes with polling will definitely bring more trouble to the database. Polling the table was not an option.

Polling a last changed timestamp

Another idea might be to store the time, a table was last changed, inside a separate table. This could be a small in memory table which just stores the timestamp.

One option to implement this would be inside the clients that create records. As different languages will be used, it would require duplicate work to implement it in all of them separately. A better option is to create a trigger that will do this update independently from the clients. Both implementations would send one UPDATE statement for every INSERT statement that is executed.

It is a reasonable solution, but the amount of performance it would require seemed to be unnecessary for this realtime stream implementation.

Message queues

The last possible idea might be to use one of the many message queues out there. For example Redis has a publish and subscribe implementation. The implementation would, also in a client, send one request to the database and one to the Redis server. A daemon would subscribe to the Redis channel and wait until new data arrives. Once it does, it will query the database for the new record and send it to all connected clients that want to be notified.

In my initial tests this scaled pretty well. There was no additional pressure on the database and Redis seemed to do a good job with the subscriptions. The problem: this project had Redis not inside it’s technoloqy stack. Using the correct tool is always a good choice, but adding a new dependency to a project is also always a risk. Someone has to know how to handle it. It has to be installed, updated, and once there are problems with it, they need to get fixed. The team was quite comfortable with PostgreSQL, but never used Redis in a production environment.

Native PostgreSQL message queue

I was quite dissatisfied with the solutions I came up with. I asked some friends what I could do. Christian seemed to have a similar problem before and pointed to me to LISTEN and NOTIFY. This feature of PostgreSQL does more or less the same thing as the Redis publish and subscribe feature, except that it is native to the database. No idea why I never heard about it before, but it’s always a good idea to learn about things you didn’t know before.

LISTEN and NOTIFY are easy to explain. To start listening inside a "channel", you use the LISTEN command. It will require the name of the channel as a second parameter:

LISTEN [channel name];

To listen for changes inside a new_tweet channel, we execute this statement:

LISTEN new_tweet;

To publish a new message inside the channel we use the NOTIFY command. It again requires a second parameter for the channel name. An optional third parameter "payload" can be used to send additional information to the subscribers of this channel:

NOTIFY [channel name] [, payload];

When we want to send a new message to the new_tweet channel, with an URL of a tweet as a payload, we can execute it like this:

NOTIFY new_tweet, 'https://twitter.com/tobiastom/status/820706307023851520';

When we run this command we will get the following result:

realtime=# NOTIFY new_tweet, 'https://twitter.com/tobiastom/status/820706307023851520';
NOTIFY
Asynchronous notification "new_tweet" with payload "https://twitter.com/tobiastom/status/820706307023851520" received from server process with PID 77562.

As soon as the notification was send, the psql client got an asynchronous notification with the URL as a payload. This is exactly what we need.

My implementation

Let’s see how LISTEN/NOTIFY can help us to get some realtime notifications. First of all we need to create the database:

$ createdb realtime

This will create the database and we can create the test table. We open the database with the psql command and add the database name as an additional parameter:

$ psql realtime
psql (9.6.1)
Type "help" for help.

realtime=# 

Next we create the database table that will store our information. The SQL statement looks like this:

CREATE TABLE tweets (url varchar(255) NOT NULL);

Inside the psql command prompt we enter the query:

realtime=# CREATE TABLE tweets (url varchar(255) NOT NULL);
CREATE TABLE

The CREATE TABLE at the end tells us that it created the table successfully. The next step is to create a function that will be executed once a new record is inserted into the database.

CREATE OR REPLACE FUNCTION did_insert_tweet() RETURNS trigger AS $$
DECLARE
BEGIN
    PERFORM pg_notify('new_tweet',NEW.url);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Executed inside the database, it looks like this:

realtime=# CREATE OR REPLACE FUNCTION did_insert_tweet() RETURNS trigger AS $$
realtime$# DECLARE
realtime$# BEGIN
realtime$# PERFORM pg_notify('new_tweet',NEW.url);
realtime$# RETURN NEW;
realtime$# END;
realtime$# $$ LANGUAGE plpgsql;
CREATE FUNCTION

As before, the last line tells us if it was executed successfully. CREATE FUNCTION is the response we needed.

Databases use „triggers“ that will be executed once an event happens. For this case we need to add a trigger that will call the function tweet_notify once a new record is inserted into the table. The create trigger statement looks like this:

CREATE TRIGGER notify_about_tweet
AFTER INSERT ON tweets
FOR EACH ROW
EXECUTE PROCEDURE did_insert_tweet();

Executed in the database, it looks like this:

realtime=# CREATE TRIGGER notify_about_tweet
realtime-# AFTER INSERT ON tweets
realtime-# FOR EACH ROW
realtime-# EXECUTE PROCEDURE tweet_notify();
CREATE TRIGGER

We did create a new database table, created a function that can be used inside a trigger and added a trigger that will be executed each time a new row is inserted into the table.

Let’s open a new terminal window, and connect ourselves to the same database:

$ psql realtime
psql (9.6.1)
Type "help" for help.

realtime=# 

Next we execute the NOTIFY command to get immediate feedback when a new tweet was inserted. The command takes the channel as a second parameter. As our channel is new_tweet, the command looks like this:

LISTEN new_tweet;

Executed inside the psql command it looks like this:

realtime=# LISTEN new_tweet;
LISTEN

LISTEN again confirms that the command was executed successfully and psql will now await new commands.

When we insert a new row into the tweets table, we should get a realtime notification about it inside this terminal window. Let’s switch back to the first window and insert a new URL:

INSERT INTO tweets (url) VALUES ('https://twitter.com/tobiastom/status/820706307023851520');

Inside psql it looks like this:

realtime=# INSERT INTO tweets (url) VALUES ('https://twitter.com/tobiastom/status/820706307023851520');
INSERT 0 1

The INSERT 0 1 confirms that we did successfully insert one new record. Next we switch back to the second window and see that nothing did change. This is expected as psql waits for us to execute a new query. Notifications get delivered immediately, but the psql command will only tell us about it after a new query was executed. Let’s just execute an empty query with ; and see what happens:

realtime=# ;
Asynchronous notification "new_tweet" with payload "https://twitter.com/tobiastom/status/820706307023851520" received from server process with PID 77538.

There we see the new_tweeet notification and the URL we inserted was provided as payload. If we insert two more URLS in the first window:

realtime=# INSERT INTO tweets (url) VALUES ('https://twitter.com/tobiastom/status/818204521021210624');
INSERT 0 1
realtime=# INSERT INTO tweets (url) VALUES ('https://twitter.com/tobiastom/status/813430157415424000');
INSERT 0 1

and execute an empty statement in the second window again:

realtime=# ;
Asynchronous notification "new_tweet" with payload "https://twitter.com/tobiastom/status/818204521021210624" received from server process with PID 77538.
Asynchronous notification "new_tweet" with payload "https://twitter.com/tobiastom/status/813430157415424000" received from server process with PID 77538.

we can see that two asynchronous notifications got delivered. When we use something like pg for Node, we can attach a Javascript callback function to this notify events. Inside them we can handle the distribution of the notifications with socket.io. This will make sure the connected browsers will receive an immediate update.

All we needed is a client library that supports the LISTEN/NOTIFY features of PostgreSQL.

Summary

There are always multiple ways to solve a problem. Given the choices I did pick the one that uses the fewest resources and does not increase the technical debt of this project. Redis might be a valid approach if you are using a MySQL database, but for this case the native PostgreSQL solution was the right choice.

Sometimes you just have to ask other people to point you into the right direction. Thank you Christian for doing it in this case.

Also available at

This section try to explain what I do for a living. If you think I could help you with a project, you can hire me though my little company succont.