Deleting data in batches

We have some cron jobs, to remove old data; but recently, as the amount of data increased, they have been causing io spikes. The internet suggested the problem was caused by deleting everything in one transaction:

DELETE FROM foo
WHERE some condition;

We found an example of chunking deletes, in T-SQL, but porting the loop to PL/pgSQL proved… problematic.

It would be nice to simply write:

DELETE FROM foo
WHERE <condition>
LIMIT 100;

But that syntax doesn’t exist. The easiest thing seems to be using a CTE, to find the ids of the rows to delete:

DELETE FROM foo
WHERE id = any(array(SELECT id FROM foo WHERE <condition> LIMIT 100));

And of course it’s useful to know how many rows were actually deleted (rather than the hopeful batch size):

WITH deleted AS (
    DELETE FROM foo
    WHERE id = any(array(SELECT id FROM foo WHERE <condition> limit 100)) 
    RETURNING id
)
SELECT count(*) FROM deleted;

It’s easier to do the loop in bash, if you can persuade psql to return that info:

while :
do
        VALUE=$(psql -qtA -d $DBNAME -c "WITH deleted AS...")
        echo "deleted $VALUE"
        if [ $VALUE -eq 0 ]
        then
                break
        fi
        sleep 1
done

There’s 2 params to play with: the batch size, and the delay between loops. It’s pretty straightforward to identify how fast you can delete, without incurring a noticeable io penalty.

Just remember that you won’t necessarily get that disk space back: it’s just made available for re-use, unless you perform some compaction.

Resetting all sequences in postgresql

It’s pretty simple to update the next value generated by a sequence, but what if you want to update them for every table?

In our case, we had been using DMS to import data, but none of the sequences were updated afterwards. So any attempt to insert a new row was doomed to failure.

To update one sequence you can call:

SELECT setval('foo.bar_id_seq', (select max(id) from foo.bar), true);

and you can get a list of tables pretty easily:

\dt *.*

but how do you put them together? My first attempt was using some vim fu (qq@q), until I realised I’d need to use a regex to capture the table name. And then I found some sequences that weren’t using the same name as the table anyway (consistency uber alles).

It’s also easy to get a list of sequences:

SELECT * FROM information_schema.sequences;

but how can you link them back to the table?

The solution is a function called pg_get_serial_sequence:

select t.schemaname, t.tablename, pg_get_serial_sequence(t.schemaname || '.' || t.tablename, c.column_name)
from pg_tables t
join information_schema.columns c on c.table_schema = t.schemaname and c.table_name = t.tablename
where t.schemaname <> 'pg_catalog' and t.schemaname <> 'information_schema' and pg_get_serial_sequence(t.schemaname || '.' || t.tablename, c.column_name) is not null;

This returns the schema, table name, and sequence name for every (non-system) table; which should be “trivial” to convert to a script updating the sequences (I considered doing that in sql, but dynamic tables aren’t easy to do).

Streaming a csv from postgresql

If you want to build an endpoint to download a csv, that could contain a large number of rows; you want to use streams, so you don’t need to hold all the data in memory before writing it.

If you are already using the pg client, it has a nifty add-on for this purpose:

const { Client } = require('pg');
const QueryStream = require('pg-query-stream');
const csvWriter = require("csv-write-stream");

module.exports = function(connectionString) {
    this.handle = function(req, res) {
        var sql = "SELECT...";
        var args = [...];

        const client = new Client({connectionString});
        client.connect().then(() => {
            var stream = new QueryStream(sql, args);
            stream.on('end', () => {
                client.end();
            });
            var query = client.query(stream);

            var writer = csvWriter();
            res.contentType("text/csv");
            writer.pipe(res);

            query.pipe(writer);
        });
    };
};

If you need to transform the data, you can add another step:

...

const transform = require('stream-transform');

            ...

            var query = client.query(stream);

            var transformer = transform(r => ({
                "User ID": r.user_id,
                "Created": r.created.toISOString(),
                ...
            }));

            ...

            query.pipe(transformer).pipe(writer);

Grokking postgresql logs with logstash

Logstash provides a grok pattern for postgresql logs. Unfortunately, it doesn’t seem to be compatible with our postgres version (9.4), and our messages were all tagged with “_grokparsefailure”.

Using the fantastic grok debugger, I was able to produce something that worked:

%{DATESTAMP:timestamp} %{TZ} %{DATA:user_id} %{GREEDYDATA:connection_id} %{DATA:level}:  %{GREEDYDATA:msg}

I’ve created an issue here, to track it.

Connecting to postgres using domain sockets and libpq

We recently started using Goose to run our migrations, and I wanted to connect to the Postgres instance on a local machine using a domain socket. The documentation states:

host
Name of host to connect to. If this begins with a slash, it specifies Unix-domain communication rather than TCP/IP communication; the value is the name of the directory in which the socket file is stored. The default behavior when host is not specified is to connect to a Unix-domain socket in /tmp (or whatever socket directory was specified when PostgreSQL was built). On machines without Unix-domain sockets, the default is to connect to localhost.

So I assumed that specifying a connection string without a host would “just work”:

db, err := sql.Open("postgres", "dbname=foo")

This doesn’t return an error, but any attempt to use the connection resulted in a “bad connection” failure. For some reason I needed to specify the path to the socket (which wasn’t in /tmp on the Debian instance I was using!):

db, err := sql.Open("postgres", "host=/run/postgresql dbname=foo sslmode=disable")