Replicating all tables, logically

pglogical is an extremely useful extension, allowing forms of replication not supported by WAL shipping; but the downside is that you have to select which tables are included.

If you are using it to perform a major version upgrade, for example, the answer is EVERYTHING (just about).

You can write a script, listing the tables; but once you’ve done that a few times, you start to think about automating it, to ensure that any new/updated tables are included correctly.

It’s relatively straightforward to get a list of all tables from PG:

SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pglogical', 'public', 'pg_catalog', ...)
    AND NOT (table_schema = '...' AND (table_name = '...' OR table_name = '...' OR table_name= '...'))
    AND NOT (table_schema = '...' AND table_name = '...')
ORDER BY 1, 2;

Excluding anything that causes problems (e.g. ref data added by migrations). This gives you a pipe separated list:

    table_schema    |          table_name          
--------------------+------------------------------
 foo              | bar

That you can plonk into a google sheet (other cloud based applications are available):

And finally, you need a horrifying line of VBA:

=CONCATENATE("SELECT pglogical.replication_set_add_table('", IF(ISBLANK(C2), "default", C2), "', '", A2, ".", B2, "', true, null, ", IF(ISBLANK(D2),"null", CONCATENATE("'", D2, "'")), ");")

Glorious!

(You don’t need all the escaped quotes, if your tables/schemas have sensible names; and you can ignore col D, if you aren’t filtering any data)

And then something similar, for sequences:

SELECT sequence_schema, sequence_name
FROM information_schema.sequences
WHERE sequence_schema NOT IN ('public', ...)
ORDER BY 1, 2;

And:

=CONCATENATE("SELECT pglogical.replication_set_add_sequence('default', '", A2, ".", B2, "');")

Do you even lift?

As well as the built-in scenarios, pgbench allows you to run a custom “transaction script”; making it a viable alternative (for some scenarios) to driving load testing via your application, e.g. using a locust swarm.

It’s not a full blown programming language (or even a DSL really), but you can for example: insert a row, and then update it:

\set uid random(1, 100000 * :scale)

BEGIN;
INSERT INTO foo (user_id, ...) VALUES (:uid, ...) RETURNING id \gset g_
COMMIT;

\sleep 1s

BEGIN;
UPDATE foo SET bar = 1 WHERE id = :g_id;
COMMIT;

You can then point pgbench at your DB, and this script, with say 20 clients for 5 mins:

docker run -it --rm --network=host -v $PWD:/app -w /app -e PGPASSWORD=postgres postgres:15 pgbench -h ... -U ... -d ... -f foo.sql  -c 20 -T 600 -j 8
...

...
number of transactions actually processed: 81752
number of failed transactions: 0 (0.000%)
latency average = 146.670 ms
initial connection time = 560.452 ms
tps = 136.360106 (without initial connection time)

The transaction count here is actually the number of times the script ran (if you have multiple transactions inside it).

Replicating sequences with pglogical

Another advantage of using pglogical, over other solutions (cough, DMS), is that it allows you to replicate the current value of a sequence; rather than having to update all those values, post failover.

SELECT pglogical.replication_set_add_sequence('default', 'foo.foo_id_seq');

It is important to understand how it is implemented though:

We periodically capture state current state of the sequence and send update to the replication queue with some additional buffer. This means that in normal situation the sequence values on subscriber will be ahead of those on provider

This can be a bit of a surprise when you check if the replication is working!

Test driving pglogical

We have been using DMS for two different tasks: upgrading to a new major pg version (when the db is too big to just click the button), and removing some old data during failover (sadly we don’t have some partitions we can just unlink).

Each time we have used it has been a “success”, i.e. we haven’t had any major disasters, or had to bail out of the failover; but it has never been a comfortable ride. Sometimes that was self inflicted, e.g. not running analyze/freeze after the import; but just setting up stable replication was a struggle, and there’s very little insight when it fails.

As we don’t need any of the more exotic features (e.g. moving from an on-prem Oracle DB to PG RDS), it felt like there must be an easier way. Postgres has had logical replication since v10, which would solve one of our problems. But it doesn’t (yet) provide a way to only replicate a subset of data.

There are a number of people selling alternative solutions, but one option that regularly crops up is pglogical. The situation isn’t entirely clear, as 2ndQ are now part of EDB, which is offering v3 (there is no obvious way to download it, so I assume that is part of their consultancy package). But afaict it is only supported as part of BDR, and the v2 repo still seems to be active.

If you’re willing to accept that risk, it’s pretty easy to do some local testing, using docker:

FROM postgres:11-bullseye

RUN apt-get update
RUN apt-get install postgresql-11-pglogical

...

Create an image with the (right version of the) extension installed, and whatever scripts you have to create a shell db (you’re using migrations, right?), and spin up two instances with compose:

version: '3'
services:
  db1:
    image: pg11
    volumes:
      - "./postgresql-11.conf:/var/lib/postgresql/data/postgresql.conf"
    environment:
      POSTGRES_PASSWORD: postgres
  db2:
    image: pg11
    volumes:
      - "./postgresql-11.conf:/var/lib/postgresql/data/postgresql.conf"
    environment:
      POSTGRES_PASSWORD: postgres

You can get the pg config template from the base image, and make the necessary changes. You need to set a password for the user, because reasons.

Once this is up:

docker-compose up

You can connect to the publisher:

docker-compose exec db1 psql -U postgres -d foodb

And create a node:

CREATE EXTENSION pglogical;

SELECT pglogical.create_node(node_name := 'db1', dsn := 'host=db1 port=5432 dbname=foodb user=postgresql password=postgresql');

Then set up the tables you want to replicate:

SELECT pglogical.replication_set_add_table('default', 'foo.bar', true, null, null);

Or with a row filter:

SELECT pglogical.replication_set_add_table('default', 'foo.bar', true, null, E'started_at > \'2022-1-1\'');

Finally, connect to the other instance, and create the subscription:

SELECT pglogical.create_node(node_name := 'db2', dsn := 'host=db2 port=5432 dbname=foodb user=postgres password=postgres');

SELECT pglogical.create_subscription(subscription_name := 'subscription1', provider_dsn := 'host=db1 port=5432 dbname=foodb user=postgres password=postgres');

(this dsn is where the password is needed)

If you get bored of typing this in, you can pipe a script through:

cat publisher.sql | docker-compose exec -T db1 psql -U postgres -d foodb

You should now be able to insert a row in db1, and see it appear in db2 (or not, depending on the filter).

You can also easily test replication from, e.g. 11 -> 14. The bigger questions are how long the initial import takes, and how stable replication is, but those will need to be answered in a production like environment.

There is also some evidence online that RDS allows the extension, but I haven’t tried it yet.

RDS Postgresql WalWriteLock

We recently had a service degradation/outage, which manifested as WalWriteLock in perf insights:

The direct cause was autovacuum on a large (heavily updated) table, but it had run ~1 hour earlier, without any issues.

Our short term solution was to raise the av threshold, and kill the process. But it had to run again, at some point, or we’d be in real trouble.

We checked the usual suspects for av slowdown, but couldn’t find any transaction older than the av process itself, or any abandoned replication slots.

We don’t currently have a replica (although we are using multi-AZ); but this prompted us to realise that we still had the wal_level set to logical, after using DMS to upgrade from pg 10 to 11. This generates considerably more WAL, than the next level down.

After turning that off (and failing over), we triggered AV again, but were still seeing high WalWriteLock contention. Eventually, we found this 10 year old breadcrumb on the pg-admin mailiing list:

Is it vacuuming a table which was bulk loaded at some time in the past? If so, this can happen any time later (usually during busy periods when many transactions numbers are being assigned)

https://www.postgresql.org/message-id/4DBFF5AE020000250003D1D9%40gw.wicourts.gov

So it seems like this was a little treat left for us by DMS, which combined with the extra WAL from logical, was enough to push us over the edge at a busy time.

Once that particular AV had managed to complete, the next one was back to normal.

The experience can be avoided, by running VACUUM FREEZE after the data migration, and before failing over to the new database.

function get_raw_page(unknown, integer) does not exist

I was looking into HOT updates, and trying to get the page info:

db=# SELECT * FROM heap_page_items(get_raw_page('foo', 0));
ERROR:  function get_raw_page(unknown, integer) does not exist
LINE 1: SELECT * FROM heap_page_items(get_raw_page('foo', 0));
                                      ^
HINT:  No function matches the given name and argument types. You might need to add explicit type casts.

The documentation seemed pretty clear, and no doubt the experienced DBAs have spotted the unforced error…

The pageinspect module provides functions…

It’s a module, and needs to be loaded

CREATE EXTENSION pageinspect;

Deleting data in batches

We have some cron jobs, to remove old data; but recently, as the amount of data increased, they have been causing io spikes. The internet suggested the problem was caused by deleting everything in one transaction:

DELETE FROM foo
WHERE some condition;

We found an example of chunking deletes, in T-SQL, but porting the loop to PL/pgSQL proved… problematic.

It would be nice to simply write:

DELETE FROM foo
WHERE <condition>
LIMIT 100;

But that syntax doesn’t exist. The easiest thing seems to be using a CTE, to find the ids of the rows to delete:

DELETE FROM foo
WHERE id = any(array(SELECT id FROM foo WHERE <condition> LIMIT 100));

And of course it’s useful to know how many rows were actually deleted (rather than the hopeful batch size):

WITH deleted AS (
    DELETE FROM foo
    WHERE id = any(array(SELECT id FROM foo WHERE <condition> limit 100)) 
    RETURNING id
)
SELECT count(*) FROM deleted;

It’s easier to do the loop in bash, if you can persuade psql to return that info:

while :
do
        VALUE=$(psql -qtA -d $DBNAME -c "WITH deleted AS...")
        echo "deleted $VALUE"
        if [ $VALUE -eq 0 ]
        then
                break
        fi
        sleep 1
done

There’s 2 params to play with: the batch size, and the delay between loops. It’s pretty straightforward to identify how fast you can delete, without incurring a noticeable io penalty.

Just remember that you won’t necessarily get that disk space back: it’s just made available for re-use, unless you perform some compaction.

Resetting all sequences in postgresql

It’s pretty simple to update the next value generated by a sequence, but what if you want to update them for every table?

In our case, we had been using DMS to import data, but none of the sequences were updated afterwards. So any attempt to insert a new row was doomed to failure.

To update one sequence you can call:

SELECT setval('foo.bar_id_seq', (select max(id) from foo.bar), true);

and you can get a list of tables pretty easily:

\dt *.*

but how do you put them together? My first attempt was using some vim fu (qq@q), until I realised I’d need to use a regex to capture the table name. And then I found some sequences that weren’t using the same name as the table anyway (consistency uber alles).

It’s also easy to get a list of sequences:

SELECT * FROM information_schema.sequences;

but how can you link them back to the table?

The solution is a function called pg_get_serial_sequence:

select t.schemaname, t.tablename, pg_get_serial_sequence(t.schemaname || '.' || t.tablename, c.column_name)
from pg_tables t
join information_schema.columns c on c.table_schema = t.schemaname and c.table_name = t.tablename
where t.schemaname <> 'pg_catalog' and t.schemaname <> 'information_schema' and pg_get_serial_sequence(t.schemaname || '.' || t.tablename, c.column_name) is not null;

This returns the schema, table name, and sequence name for every (non-system) table; which should be “trivial” to convert to a script updating the sequences (I considered doing that in sql, but dynamic tables aren’t easy to do).

Streaming a csv from postgresql

If you want to build an endpoint to download a csv, that could contain a large number of rows; you want to use streams, so you don’t need to hold all the data in memory before writing it.

If you are already using the pg client, it has a nifty add-on for this purpose:

const { Client } = require('pg');
const QueryStream = require('pg-query-stream');
const csvWriter = require("csv-write-stream");

module.exports = function(connectionString) {
    this.handle = function(req, res) {
        var sql = "SELECT...";
        var args = [...];

        const client = new Client({connectionString});
        client.connect().then(() => {
            var stream = new QueryStream(sql, args);
            stream.on('end', () => {
                client.end();
            });
            var query = client.query(stream);

            var writer = csvWriter();
            res.contentType("text/csv");
            writer.pipe(res);

            query.pipe(writer);
        });
    };
};

If you need to transform the data, you can add another step:

...

const transform = require('stream-transform');

            ...

            var query = client.query(stream);

            var transformer = transform(r => ({
                "User ID": r.user_id,
                "Created": r.created.toISOString(),
                ...
            }));

            ...

            query.pipe(transformer).pipe(writer);