Set operations on a list of ids

I’ve been doing some (small to medium) data work recently, and when trying to identify missing data it’s often necessary to resort to comparing two lists of rows.

SQL is an ideal tool for this, but if you can’t do cross database queries, then you need to resort to something more basic.

I was initially segmenting the data, and using a spreadsheet with conditional formatting to highlight the extra (or missing) rows:

=MATCH(A1, B1, 0)

But once you’re comparing tens of thousands of rows, that loses appeal. At this point I discovered the comm utility:

$ comm -23 old.csv new.csv

This will print out those lines only in the old csv.

Re-processing failed firehose batches

If a batch of records fails during the transformation step, they will be dumped in a folder named /processing-failed/YYYY/MM/DD/HH/ in your s3 bucket.

The file (possibly gzipped) will contain a line for each record, in this format:

    "errorMessage":"The Lambda function was successfully invoked but it returned an error result.",

The error message isn’t particularly informative, so you’ll need to check the lambda logs. Once you’ve fixed the lambda (or removed the offending record), there doesn’t seem to be any one click way to re-process the batch.

But it’s relatively straightforward to script it, using python (or any other available sdk):

import base64
import boto3
import json
import sys

filename = sys.argv[1]

with open(filename) as f:
    all_records = list(map(lambda l: { "Data": base64.b64decode(json.loads(l)["rawData"]) }, f.readlines()))

batch_size = 100
batches = [all_records[i:i + batch_size] for i in range(0, len(all_records), batch_size)]

client = boto3.client('firehose')

for batch in batches:
    response = client.put_record_batch(
    if response["FailedPutCount"] > 0:
        raise Exception("Bad batch")

If your records are quite small, you can probably increase the batch size (max 500, or 4MB).

Doing gitops without Kubernetes

gitops is a hot topic with the “cloud native” crowd, but is it still relevant if you aren’t using k8s? In a word, yes.

While most examples are of kubernetes yaml (or helm charts), there’s nothing stopping you applying the same principles to your homegrown CD pipeline, and reaping the same benefits.

We are in the process of moving from systemd services to docker, running on our own hosts (ec2 instances). The easy thing to do, would be to just pull latest every time, but it’s not ideal to have no control over what version of software you have installed where.

Instead, we have a git repo containing fragments of yaml, one for each service:

  image: >-
  gitCommit: somesha
  buildNumber: '123'
    BAR: baz

The staging branch of these are updated by the CI build for each service, triggered by a push to main, once the new image has been pushed to ECR.

To deploy, we run an ansible playbook on each docker host. First, we load up all the yaml:

- name: Get service list
      dest: "{{ docker_services_folder }}"
      version: "{{ docker_services_env | default('staging') }}"
  delegate_to: localhost
  run_once: yes
  register: docker_services_repo

We then generate an env file for each service:

- name: "Create env files"
  template: src=env_file dest=/etc/someorg/{{ docker_services[item].name }}
  become: yes
    app_name: "{{ docker_services[item].name }}"
    env_vars: "{{ docker_services[item].env_vars | default([]) }}"
  loop: "{{ docker_services.keys() }}"
  register: env_files

And finally run each container:

- name: "Pull & Start Services"
    name: "{{ docker_services[item].name }}"
    image: "{{ docker_services[item].image }}"
    state: "started"
    restart_policy: "always"
    recreate: "{{ env_files.results | selectattr('item', 'equalto', item) | map(attribute='changed') | first }}"
    pull: true
    init: true
    output_logs: true
    log_driver: "syslog"
      tag: someorg
      syslog-facility: local0
    env_file: "/etc/someorg/{{ docker_services[item].name }}"
    network_mode: "host"
  loop: "{{ docker_services.keys() }}"
  become: yes

If the env vars have changed, the container needs to be recreated. Otherwise, only the images that have changed will be restarted (we still remove the node from the LB first).

This gives us an audit trail of which image has been deployed, and makes rollbacks easy (revert the commit).

If the staging deploy is successful (after some smoke tests run), another job creates a PR to merge the changes onto the production branch. When that is merged (after any necessary inspection), the same process repeats on the prod hosts.

Removing (almost) duplicates in Redshift

The AWS Firehose guarantees “at least once” delivery, and Redshift doesn’t enforce uniqueness; which can result in duplicate rows. Or, if you are using an impure transform step (e.g. spot fx rates), with “almost duplicate” rows.

The consensus seems to be to use a temp table, removing all the duplicate rows, and inserting them back just once. Which is very effective. But if you have “almost duplicates”, you need something slightly different (using DISTINCT will result in all the rows being added to the temp table).

CREATE TEMP TABLE duplicated_foo(LIKE foo);
ALTER TABLE duplicated_foo ADD COLUMN row_number integer;

You need an extra column in the temp table, for the row number.

INSERT INTO duplicated_foo
WITH dupes AS (
    SELECT id, region
    FROM foo
    GROUP BY id, region
    HAVING COUNT(*) > 1
), matches AS (
    SELECT foo.*, row_number() over (partition by, foo.region)
    FROM foo
    JOIN dupes ON =
        AND dupes.region = foo.region
FROM matches
WHERE row_number = 1;

We have a composite key, which complicates things further. This is taking the first row, that matches on both columns.

ALTER TABLE duplicated_foo DROP COLUMN row_number;

You can then drop the extra column from the temp table.

USING duplicated_foo
    AND foo.region = duplicated_foo.region;

-- Insert back in the single copies
FROM duplicated_foo;

Remove all duplicate rows (whatever that means to you), and copy back in the valid data.

Warm indexes not moving

We have an ES cluster, using the hot-warm architecture. The ILM policy was rolling the indexes, and moving them to the warm phase; but the indexes were stuck on the (expensive) hot nodes, and the warm nodes were sitting there, with empty disks.

You can check exactly what is where, using cat shards, and the metrics were correct. I decided to try and force a move, using cluster reroute:

POST /_cluster/reroute
  "commands": [
      "move": {
        "index": "foo", "shard": 0,
        "from_node": "hotnode1", "to_node": "warmnode2"

And got told:

[NO(node does not match index setting [index.routing.allocation.require] filters [data:\"hot\"])]

On closer inspection of the index settings, I realised that although the ILM policy was adding the correct attributes to prefer a warm node:

     "routing": {
        "allocation": {
          "include": {
            "_tier_preference": "data_warm,data_hot"
          "require": {
            "data": "hot"

It wasn’t removing the existing attribute forcing it to use a hot node. It was (relatively) easy to fix that for the existing indexes:

PUT /foo/_settings
    "": null

Once the require attribute was removed, the indexes were relocated automatically. Unfortunately, I couldn’t find a way to do the same thing using ILM, other than explicitly flipping the require to warm:

            "warm": {
                "actions": {
                    "allocate" : { 
                        "require" : { 
                            "data": "warm"

Testing ES ingest pipelines

If you are working with ElasticSearch, it’s useful to be able to test locally. Thanks to the magic of docker, that’s simpler than ever:

version: '3' 
      - "9200:9200"
      - "9300:9300"
      - "discovery.type=single-node"
      - ./data:/usr/share/elasticsearch/data
      - "5601:5601"

(I need to use a volume, because my root partition is tiny). With these containers running, you can set up filebeat (e.g. in vagrant), and start shipping logs. It’s then simple to test an ingest pipeline:

curl "http://localhost:9200/_ingest/pipeline/foo" -X "PUT" -d @ingest/foo.json -H 'content-type: application/json'

Or an index template:

curl "http://localhost:9200/_template/foo" -X "PUT" -d @index-template/foo.json -H 'content-type: application/json'

Or an ILM policy:

curl "http://localhost:9200/_ilm/policy/foo" -X "PUT" -d @ilm-policy/foo.json -H 'content-type: application/json'

If you want to play with APM too, you also need that server running:

      - "8200:8200"

Using ZooKeeper for locking

Postgresql provides a variety of ways to lock a row, but if you are looking to increase throughput, holding onto a valuable connection (even with a pooler) just for a lock isn’t ideal.

ZooKeeper is a popular distributed locking solution, and is relatively straightforward to run (particularly if you don’t mind risking a single node).

If you want a traditional lock, with a queue, you need a sequence node; but if you’re happy to bail out when the lock is already taken (equiv to SELECT ... FOR UPDATE NOWAIT) then you only need a single ephemeral lock node.

This client lib mimics the java API, but it’s simple to add a wrapper making it more idiomatic:

const {promisify} = require('util');
const zookeeper = require('node-zookeeper-client');

module.exports = function({ uri }) {
    this.connect = function() {
        return new Promise((resolve) => {
            var client = zookeeper.createClient(uri);
            client.once('connected', function () {
                    create: function(path) {
                        return promisify(client.create).bind(client)(path, null, zookeeper.ACL.OPEN_ACL_UNSAFE,

                    close: client.close.bind(client),

And use this to hold a lock, while awaiting a promise:

module.exports = function(zookeeper, errorCodes) {
    return async function withLock(id, cb) {
        const client = await zookeeper.connect();
        try {
            const nodeName = `/foo_${id}`;
            await client.create(nodeName);
            const res = await cb();
            return res;
        } catch (err) {
            if ( === "NODE_EXISTS") {
                throw ...;
            throw err;
        } finally {

Jobs that create jobs

Over the last few years, there has been a push for more “* as code” with Jenkins configuration. You can now specify job config using a Jenkinsfile, allowing auditing and code reviews, as well as a backup.

Combined with the Job DSL plugin, this makes it possible to create a seed job (using another Jenkinsfile, naturally) that creates all the jobs for a specific project.

pipeline {
    agent any

    options {
        timestamps ()

    stages {
        stage('Clean') {
            steps {

        stage('Checkout') {
            steps {
                checkout scm

        stage('Job DSL') {
            steps {
                jobDsl targets: ['jobs/*.groovy', 'views/*.groovy'].join('\n')

This will run all the groovy scripts in the jobs & views folders in this repo (once you’ve approved them).

For example:

pipelineJob("foo-main") {
    definition {
            scm {
                git {
                    remote {
                        github("examplecorp/foo", "ssh")
    properties {
        pipelineTriggers {
            triggers {
                cron {

And a view, to put it in:

listView('foo') {

    jobs {

    columns {

Deleting data in batches

We have some cron jobs, to remove old data; but recently, as the amount of data increased, they have been causing io spikes. The internet suggested the problem was caused by deleting everything in one transaction:

WHERE some condition;

We found an example of chunking deletes, in T-SQL, but porting the loop to PL/pgSQL proved… problematic.

It would be nice to simply write:

WHERE <condition>
LIMIT 100;

But that syntax doesn’t exist. The easiest thing seems to be using a CTE, to find the ids of the rows to delete:

WHERE id = any(array(SELECT id FROM foo WHERE <condition> LIMIT 100));

And of course it’s useful to know how many rows were actually deleted (rather than the hopeful batch size):

WITH deleted AS (
    WHERE id = any(array(SELECT id FROM foo WHERE <condition> limit 100)) 
SELECT count(*) FROM deleted;

It’s easier to do the loop in bash, if you can persuade psql to return that info:

while :
        VALUE=$(psql -qtA -d $DBNAME -c "WITH deleted AS...")
        echo "deleted $VALUE"
        if [ $VALUE -eq 0 ]
        sleep 1

There’s 2 params to play with: the batch size, and the delay between loops. It’s pretty straightforward to identify how fast you can delete, without incurring a noticeable io penalty.

Just remember that you won’t necessarily get that disk space back: it’s just made available for re-use, unless you perform some compaction.

ETIMEDOUT connecting to pgbouncer

We use pgbouncer as a connection pooler, and in one of our production enviroments (after a recent migration) we were getting some portion of connection attempts failing with ETIMEDOUT.

Our first assumption was that it was due to some limitation of our service provider’s internal network, but they assured us that they couldn’t see any failures; and when we looked at the other end, we couldn’t either.

So it seemed to be some limitation on the client host (e.g. hitting the file descriptor limit). We had a look at some netstat data, and added some datadog tcp metrics, but nothing stood out.

At this point, there seemed to be no other option than to use tcpdump and see if we could find a reason that the connection was rejected. We fired it up:

sudo tcpdump -i eth2 -w tcpdump.log

downloaded the output, and opened it up in wireshark. Following some helpful instructions we identified some likely packets.

At this point it was starting to look like we were suffering from ephemeral port exhaustion, so we decided to experiment with running pgbouncer on the app server instead, as that would reduce the number of open sockets between the hosts.

A resounding success! I’m sure it would also be possible to tune some linux tcp options, to the same effect, but this was acceptable for us (there’s only one app server in that env).

I’m not entirely sure why we were getting a time out, rather than EADDRNOTAVAIL, but that may be due to the client library we are using to connect.