Warm indexes not moving

We have an ES cluster, using the hot-warm architecture. The ILM policy was rolling the indexes, and moving them to the warm phase; but the indexes were stuck on the (expensive) hot nodes, and the warm nodes were sitting there, with empty disks.

You can check exactly what is where, using cat shards, and the metrics were correct. I decided to try and force a move, using cluster reroute:

POST /_cluster/reroute
  "commands": [
      "move": {
        "index": "foo", "shard": 0,
        "from_node": "hotnode1", "to_node": "warmnode2"

And got told:

[NO(node does not match index setting [index.routing.allocation.require] filters [data:\"hot\"])]

On closer inspection of the index settings, I realised that although the ILM policy was adding the correct attributes to prefer a warm node:

     "routing": {
        "allocation": {
          "include": {
            "_tier_preference": "data_warm,data_hot"
          "require": {
            "data": "hot"

It wasn’t removing the existing attribute forcing it to use a hot node. It was (relatively) easy to fix that for the existing indexes:

PUT /foo/_settings
    "routing.allocation.require.data": null

Once the require attribute was removed, the indexes were relocated automatically. Unfortunately, I couldn’t find a way to do the same thing using ILM, other than explicitly flipping the require to warm:

            "warm": {
                "actions": {
                    "allocate" : { 
                        "require" : { 
                            "data": "warm"

Testing ES ingest pipelines

If you are working with ElasticSearch, it’s useful to be able to test locally. Thanks to the magic of docker, that’s simpler than ever:

version: '3' 
    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1
      - "9200:9200"
      - "9300:9300"
      - "discovery.type=single-node"
      - ./data:/usr/share/elasticsearch/data
    image: docker.elastic.co/kibana/kibana:7.6.1
      - "5601:5601"

(I need to use a volume, because my root partition is tiny). With these containers running, you can set up filebeat (e.g. in vagrant), and start shipping logs. It’s then simple to test an ingest pipeline:

curl "http://localhost:9200/_ingest/pipeline/foo" -X "PUT" -d @ingest/foo.json -H 'content-type: application/json'

Or an index template:

curl "http://localhost:9200/_template/foo" -X "PUT" -d @index-template/foo.json -H 'content-type: application/json'

Or an ILM policy:

curl "http://localhost:9200/_ilm/policy/foo" -X "PUT" -d @ilm-policy/foo.json -H 'content-type: application/json'

If you want to play with APM too, you also need that server running:

    image: docker.elastic.co/apm/apm-server:7.6.1
      - "8200:8200"

Using ZooKeeper for locking

Postgresql provides a variety of ways to lock a row, but if you are looking to increase throughput, holding onto a valuable connection (even with a pooler) just for a lock isn’t ideal.

ZooKeeper is a popular distributed locking solution, and is relatively straightforward to run (particularly if you don’t mind risking a single node).

If you want a traditional lock, with a queue, you need a sequence node; but if you’re happy to bail out when the lock is already taken (equiv to SELECT ... FOR UPDATE NOWAIT) then you only need a single ephemeral lock node.

This client lib mimics the java API, but it’s simple to add a wrapper making it more idiomatic:

const {promisify} = require('util');
const zookeeper = require('node-zookeeper-client');

module.exports = function({ uri }) {
    this.connect = function() {
        return new Promise((resolve) => {
            var client = zookeeper.createClient(uri);
            client.once('connected', function () {
                    create: function(path) {
                        return promisify(client.create).bind(client)(path, null, zookeeper.ACL.OPEN_ACL_UNSAFE,

                    close: client.close.bind(client),

And use this to hold a lock, while awaiting a promise:

module.exports = function(zookeeper, errorCodes) {
    return async function withLock(id, cb) {
        const client = await zookeeper.connect();
        try {
            const nodeName = `/foo_${id}`;
            await client.create(nodeName);
            const res = await cb();
            return res;
        } catch (err) {
            if (err.name === "NODE_EXISTS") {
                throw ...;
            throw err;
        } finally {

Jobs that create jobs

Over the last few years, there has been a push for more “* as code” with Jenkins configuration. You can now specify job config using a Jenkinsfile, allowing auditing and code reviews, as well as a backup.

Combined with the Job DSL plugin, this makes it possible to create a seed job (using another Jenkinsfile, naturally) that creates all the jobs for a specific project.

pipeline {
    agent any

    options {
        timestamps ()

    stages {
        stage('Clean') {
            steps {

        stage('Checkout') {
            steps {
                checkout scm

        stage('Job DSL') {
            steps {
                jobDsl targets: ['jobs/*.groovy', 'views/*.groovy'].join('\n')

This will run all the groovy scripts in the jobs & views folders in this repo (once you’ve approved them).

For example:

pipelineJob("foo-main") {
    definition {
            scm {
                git {
                    remote {
                        github("examplecorp/foo", "ssh")
    properties {
        pipelineTriggers {
            triggers {
                cron {

And a view, to put it in:

listView('foo') {

    jobs {

    columns {

Deleting data in batches

We have some cron jobs, to remove old data; but recently, as the amount of data increased, they have been causing io spikes. The internet suggested the problem was caused by deleting everything in one transaction:

WHERE some condition;

We found an example of chunking deletes, in T-SQL, but porting the loop to PL/pgSQL proved… problematic.

It would be nice to simply write:

WHERE <condition>
LIMIT 100;

But that syntax doesn’t exist. The easiest thing seems to be using a CTE, to find the ids of the rows to delete:

WHERE id = any(array(SELECT id FROM foo WHERE <condition> LIMIT 100));

And of course it’s useful to know how many rows were actually deleted (rather than the hopeful batch size):

WITH deleted AS (
    WHERE id = any(array(SELECT id FROM foo WHERE <condition> limit 100)) 
SELECT count(*) FROM deleted;

It’s easier to do the loop in bash, if you can persuade psql to return that info:

while :
        VALUE=$(psql -qtA -d $DBNAME -c "WITH deleted AS...")
        echo "deleted $VALUE"
        if [ $VALUE -eq 0 ]
        sleep 1

There’s 2 params to play with: the batch size, and the delay between loops. It’s pretty straightforward to identify how fast you can delete, without incurring a noticeable io penalty.

Just remember that you won’t necessarily get that disk space back: it’s just made available for re-use, unless you perform some compaction.

ETIMEDOUT connecting to pgbouncer

We use pgbouncer as a connection pooler, and in one of our production enviroments (after a recent migration) we were getting some portion of connection attempts failing with ETIMEDOUT.

Our first assumption was that it was due to some limitation of our service provider’s internal network, but they assured us that they couldn’t see any failures; and when we looked at the other end, we couldn’t either.

So it seemed to be some limitation on the client host (e.g. hitting the file descriptor limit). We had a look at some netstat data, and added some datadog tcp metrics, but nothing stood out.

At this point, there seemed to be no other option than to use tcpdump and see if we could find a reason that the connection was rejected. We fired it up:

sudo tcpdump -i eth2 -w tcpdump.log

downloaded the output, and opened it up in wireshark. Following some helpful instructions we identified some likely packets.

At this point it was starting to look like we were suffering from ephemeral port exhaustion, so we decided to experiment with running pgbouncer on the app server instead, as that would reduce the number of open sockets between the hosts.

A resounding success! I’m sure it would also be possible to tune some linux tcp options, to the same effect, but this was acceptable for us (there’s only one app server in that env).

I’m not entirely sure why we were getting a time out, rather than EADDRNOTAVAIL, but that may be due to the client library we are using to connect.

Bootstrapping node in docker

It always seems to take me a few attempts to get this right, so I’m making a note here:

docker run -it -v $PWD:/app -w /app node:12-alpine npm init

You may need to chown the package.json after (the docker daemon runs as root).

You can use the same thing to add packages:

docker run -it -v $PWD:/app -w /app node:12-alpine npm i --save foo

Disk by id

We’ve been using an (openstack based) cloud provider, that can’t guarantee a stable device name for an attached volume.

This was causing problems when used in /etc/fstab; on reboot, if the device name was incorrect, the instance would hang.

It’s pretty straight forward to use the UUID instead, with ansible:

- name: Mount vol
  become: yes
    path: "{{ mount_point }}"
    src: "UUID={{ ansible_devices[device_name].partitions[device_name + '1'].uuid }}"
    fstype: ext4
    state: mounted

but we still needed the device_name in group vars. Our provider explained that a stable id was provided, in /dev/disk/by-id, which could be used directly for most tasks:

- name: Create a new primary partition
    device: "/dev/disk/by-id/{{ device_id }}"
    number: 1
    state: present
  become: yes

- name: Create ext4 filesystem on vol
  become: yes
    fstype: ext4
    dev: "/dev/disk/by-id/{{ device_id }}-part1"

But how do you get from the id, to the device name?

$ ls /dev/disk/by-id/
virtio-c11c38e5-7021-48d2-a  virtio-c11c38e5-7021-48d2-a-part1
"ansible_devices": {
        "vda": {
        "vdb": {
        "vdc": {
            "links": {
                "ids": [

This seemed like a job for json_query but, after a fruitless hour or two, I gave up and used this (slightly hacky) solution suggested on SO:

- name: Get device name
    device_name: "{{ item.key }}"
  with_dict: "{{ ansible_devices }}"
  when: "(item.value.links.ids[0] | default()) == device_id"
  no_log: yes

Resetting all sequences in postgresql

It’s pretty simple to update the next value generated by a sequence, but what if you want to update them for every table?

In our case, we had been using DMS to import data, but none of the sequences were updated afterwards. So any attempt to insert a new row was doomed to failure.

To update one sequence you can call:

SELECT setval('foo.bar_id_seq', (select max(id) from foo.bar), true);

and you can get a list of tables pretty easily:

\dt *.*

but how do you put them together? My first attempt was using some vim fu (qq@q), until I realised I’d need to use a regex to capture the table name. And then I found some sequences that weren’t using the same name as the table anyway (consistency uber alles).

It’s also easy to get a list of sequences:

SELECT * FROM information_schema.sequences;

but how can you link them back to the table?

The solution is a function called pg_get_serial_sequence:

select t.schemaname, t.tablename, pg_get_serial_sequence(t.schemaname || '.' || t.tablename, c.column_name)
from pg_tables t
join information_schema.columns c on c.table_schema = t.schemaname and c.table_name = t.tablename
where t.schemaname <> 'pg_catalog' and t.schemaname <> 'information_schema' and pg_get_serial_sequence(t.schemaname || '.' || t.tablename, c.column_name) is not null;

This returns the schema, table name, and sequence name for every (non-system) table; which should be “trivial” to convert to a script updating the sequences (I considered doing that in sql, but dynamic tables aren’t easy to do).

Streaming a csv from postgresql

If you want to build an endpoint to download a csv, that could contain a large number of rows; you want to use streams, so you don’t need to hold all the data in memory before writing it.

If you are already using the pg client, it has a nifty add-on for this purpose:

const { Client } = require('pg');
const QueryStream = require('pg-query-stream');
const csvWriter = require("csv-write-stream");

module.exports = function(connectionString) {
    this.handle = function(req, res) {
        var sql = "SELECT...";
        var args = [...];

        const client = new Client({connectionString});
        client.connect().then(() => {
            var stream = new QueryStream(sql, args);
            stream.on('end', () => {
            var query = client.query(stream);

            var writer = csvWriter();


If you need to transform the data, you can add another step:


const transform = require('stream-transform');


            var query = client.query(stream);

            var transformer = transform(r => ({
                "User ID": r.user_id,
                "Created": r.created.toISOString(),