Generating a histogram with R

If you have a csv with a list of values, and you want to see the distribution, R is an excellent choice.

First, you need to load the data:

data <- read.csv("data.csv", header=TRUE)

(faster methods are available, for large datasets)

You will also need to install ggplot2, if you haven’t already:

install.packages("ggplot2")

(you can install the entire tidyverse, but that seems to download most of the internet)

Finally, import the library, and generate the chart:

library(ggplot2)

ggplot(data, aes(x=num)) + geom_histogram(binwidth=.5)

Et voila!

Updating PRs when deployed

I wanted to add a comment, on all the PRs in a changeset, e.g. when deployed to a specific environment.

Assuming you have a commit range, it’s relatively straightforward to get the list of commits between them:

const res = await octokit.request({
        method: 'GET',
        url: '/repos/{owner}/{repo}/compare/{basehead}',
        owner: 'foo',
        repo: 'bar',
        basehead: `${process.env.GIT_PREVIOUS_SUCCESSFUL_COMMIT}...${process.env.GIT_COMMIT}`
    });

Then, for each commit, you can search for a PR containing it:

    const commits = res.data.commits.map(c => c.sha);
    const results = await Promise.all(commits.map(q => {
        return octokit.request({
            method: 'GET',
            url: '/search/issues',
            owner: 'foo',
            repo: 'bar',
            q,
        })
    }));

Assuming you only want one comment per PR, you can use a Set to get a distinct list of numbers:

    const prs = new Set();
    results.forEach(r => {
        if (r.data.items.length) {
            prs.add(r.data.items[0].number);
        }
    });
    const prList = Array.from(prs.keys());

And finally add a comment on each PR:

 await Promise.all(prList.map(pr => {
        return octokit.request({
            method: 'POST',
            url: '/repos/{owner}/{repo}/issues/{issue_number}/comments',
            owner: 'foo',
            repo: 'bar',
            issue_number: pr,
            body: `something something: ${process.env.BUILD_URL}`
        });
    }));

Or, if you’re feeling really brave, you can do the whole thing in one line of bash!

curl -s -u "$GH_USER:$GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/foo/bar/compare/$BASE_COMMIT...$HEAD_COMMIT" \
    | jq '.commits[] | .sha' \
    | xargs -I '{}' sh -c 'curl -s -u "$GH_USER:$GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/search/issues?q={}" \
    | jq '\''.items[0] | .number'\''' \
    | sort \
    | uniq \
    | xargs -I '{}' sh -c 'curl -s -u "$GH_USER:$GH_TOKEN" -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/foo/bar/issues/{}/comments" -d "{\"body\":\"something something: $BUILD_URL\"}" > /dev/null'

Set operations on a list of ids

I’ve been doing some (small to medium) data work recently, and when trying to identify missing data it’s often necessary to resort to comparing two lists of rows.

SQL is an ideal tool for this, but if you can’t do cross database queries, then you need to resort to something more basic.

I was initially segmenting the data, and using a spreadsheet with conditional formatting to highlight the extra (or missing) rows:

=MATCH(A1, B1, 0)

But once you’re comparing tens of thousands of rows, that loses appeal. At this point I discovered the comm utility:

$ comm -23 old.csv new.csv

This will print out those lines only in the old csv.

Re-processing failed firehose batches

If a batch of records fails during the transformation step, they will be dumped in a folder named /processing-failed/YYYY/MM/DD/HH/ in your s3 bucket.

The file (possibly gzipped) will contain a line for each record, in this format:

{
    "attemptsMade":4,
    "arrivalTimestamp":1616630407645,
    "errorCode":"Lambda.FunctionError",
    "errorMessage":"The Lambda function was successfully invoked but it returned an error result.",
    "attemptEndingTimestamp":1616630456597,
    "rawData":"someb64==",
    "lambdaArn":"arn:aws:lambda:region:account:function:function:$LATEST"
}

The error message isn’t particularly informative, so you’ll need to check the lambda logs. Once you’ve fixed the lambda (or removed the offending record), there doesn’t seem to be any one click way to re-process the batch.

But it’s relatively straightforward to script it, using python (or any other available sdk):

import base64
import boto3
import json
import sys

filename = sys.argv[1]

with open(filename) as f:
    all_records = list(map(lambda l: { "Data": base64.b64decode(json.loads(l)["rawData"]) }, f.readlines()))

batch_size = 100
batches = [all_records[i:i + batch_size] for i in range(0, len(all_records), batch_size)]

client = boto3.client('firehose')

for batch in batches:
    response = client.put_record_batch(
        DeliveryStreamName='some-firehose',
        Records=batch
    )
    if response["FailedPutCount"] > 0:
        print(response)
        raise Exception("Bad batch")

If your records are quite small, you can probably increase the batch size (max 500, or 4MB).

Doing gitops without Kubernetes

gitops is a hot topic with the “cloud native” crowd, but is it still relevant if you aren’t using k8s? In a word, yes.

While most examples are of kubernetes yaml (or helm charts), there’s nothing stopping you applying the same principles to your homegrown CD pipeline, and reaping the same benefits.

We are in the process of moving from systemd services to docker, running on our own hosts (ec2 instances). The easy thing to do, would be to just pull latest every time, but it’s not ideal to have no control over what version of software you have installed where.

Instead, we have a git repo containing fragments of yaml, one for each service:

foo_service:
  image: >-
    1234.ecr.amazonaws.com/foo-service@sha256:somesha
  gitCommit: somesha
  buildNumber: '123'
  env_vars:
    BAR: baz

The staging branch of these are updated by the CI build for each service, triggered by a push to main, once the new image has been pushed to ECR.

To deploy, we run an ansible playbook on each docker host. First, we load up all the yaml:

- name: Get service list
  git:
      repo: git@github.com:someorg/docker-services.git
      dest: "{{ docker_services_folder }}"
      version: "{{ docker_services_env | default('staging') }}"
  delegate_to: localhost
  run_once: yes
  register: docker_services_repo

We then generate an env file for each service:

- name: "Create env files"
  template: src=env_file dest=/etc/someorg/{{ docker_services[item].name }}
  become: yes
  vars:
    app_name: "{{ docker_services[item].name }}"
    ...
    env_vars: "{{ docker_services[item].env_vars | default([]) }}"
  loop: "{{ docker_services.keys() }}"
  register: env_files

And finally run each container:

- name: "Pull & Start Services"
  docker_container:
    name: "{{ docker_services[item].name }}"
    image: "{{ docker_services[item].image }}"
    state: "started"
    restart_policy: "always"
    recreate: "{{ env_files.results | selectattr('item', 'equalto', item) | map(attribute='changed') | first }}"
    pull: true
    init: true
    output_logs: true
    log_driver: "syslog"
    log_options:
      tag: someorg
      syslog-facility: local0
    env_file: "/etc/someorg/{{ docker_services[item].name }}"
    network_mode: "host"
  loop: "{{ docker_services.keys() }}"
  become: yes

If the env vars have changed, the container needs to be recreated. Otherwise, only the images that have changed will be restarted (we still remove the node from the LB first).

This gives us an audit trail of which image has been deployed, and makes rollbacks easy (revert the commit).

If the staging deploy is successful (after some smoke tests run), another job creates a PR to merge the changes onto the production branch. When that is merged (after any necessary inspection), the same process repeats on the prod hosts.

Removing (almost) duplicates in Redshift

The AWS Firehose guarantees “at least once” delivery, and Redshift doesn’t enforce uniqueness; which can result in duplicate rows. Or, if you are using an impure transform step (e.g. spot fx rates), with “almost duplicate” rows.

The consensus seems to be to use a temp table, removing all the duplicate rows, and inserting them back just once. Which is very effective. But if you have “almost duplicates”, you need something slightly different (using DISTINCT will result in all the rows being added to the temp table).

CREATE TEMP TABLE duplicated_foo(LIKE foo);
ALTER TABLE duplicated_foo ADD COLUMN row_number integer;

You need an extra column in the temp table, for the row number.

INSERT INTO duplicated_foo
WITH dupes AS (
    SELECT id, region
    FROM foo
    GROUP BY id, region
    HAVING COUNT(*) > 1
), matches AS (
    SELECT foo.*, row_number() over (partition by foo.id, foo.region)
    FROM foo
    JOIN dupes ON dupes.id = foo.id
        AND dupes.region = foo.region
)
SELECT *
FROM matches
WHERE row_number = 1;

We have a composite key, which complicates things further. This is taking the first row, that matches on both columns.

ALTER TABLE duplicated_foo DROP COLUMN row_number;

You can then drop the extra column from the temp table.

DELETE FROM foo
USING duplicated_foo
WHERE foo.id = duplicated_foo.id
    AND foo.region = duplicated_foo.region;

-- Insert back in the single copies
INSERT INTO foo
SELECT *
FROM duplicated_foo;

Remove all duplicate rows (whatever that means to you), and copy back in the valid data.

Warm indexes not moving

We have an ES cluster, using the hot-warm architecture. The ILM policy was rolling the indexes, and moving them to the warm phase; but the indexes were stuck on the (expensive) hot nodes, and the warm nodes were sitting there, with empty disks.

You can check exactly what is where, using cat shards, and the metrics were correct. I decided to try and force a move, using cluster reroute:

POST /_cluster/reroute
{
  "commands": [
    {
      "move": {
        "index": "foo", "shard": 0,
        "from_node": "hotnode1", "to_node": "warmnode2"
      }
    }
}

And got told:

[NO(node does not match index setting [index.routing.allocation.require] filters [data:\"hot\"])]

On closer inspection of the index settings, I realised that although the ILM policy was adding the correct attributes to prefer a warm node:

     "routing": {
        "allocation": {
          "include": {
            "_tier_preference": "data_warm,data_hot"
          },
          "require": {
            "data": "hot"
          }
        }
      }

It wasn’t removing the existing attribute forcing it to use a hot node. It was (relatively) easy to fix that for the existing indexes:

PUT /foo/_settings
{ 
    "routing.allocation.require.data": null
}

Once the require attribute was removed, the indexes were relocated automatically. Unfortunately, I couldn’t find a way to do the same thing using ILM, other than explicitly flipping the require to warm:

            "warm": {
                "actions": {
                    ...
                    "allocate" : { 
                        "require" : { 
                            "data": "warm"
                        }
                    }
                }
            },

Testing ES ingest pipelines

If you are working with ElasticSearch, it’s useful to be able to test locally. Thanks to the magic of docker, that’s simpler than ever:

version: '3' 
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      - "discovery.type=single-node"
    volumes:
      - ./data:/usr/share/elasticsearch/data
  kibana:
    image: docker.elastic.co/kibana/kibana:7.6.1
    ports:
      - "5601:5601"

(I need to use a volume, because my root partition is tiny). With these containers running, you can set up filebeat (e.g. in vagrant), and start shipping logs. It’s then simple to test an ingest pipeline:

curl "http://localhost:9200/_ingest/pipeline/foo" -X "PUT" -d @ingest/foo.json -H 'content-type: application/json'

Or an index template:

curl "http://localhost:9200/_template/foo" -X "PUT" -d @index-template/foo.json -H 'content-type: application/json'

Or an ILM policy:

curl "http://localhost:9200/_ilm/policy/foo" -X "PUT" -d @ilm-policy/foo.json -H 'content-type: application/json'

If you want to play with APM too, you also need that server running:

  apm:
    image: docker.elastic.co/apm/apm-server:7.6.1
    ports:
      - "8200:8200"

Using ZooKeeper for locking

Postgresql provides a variety of ways to lock a row, but if you are looking to increase throughput, holding onto a valuable connection (even with a pooler) just for a lock isn’t ideal.

ZooKeeper is a popular distributed locking solution, and is relatively straightforward to run (particularly if you don’t mind risking a single node).

If you want a traditional lock, with a queue, you need a sequence node; but if you’re happy to bail out when the lock is already taken (equiv to SELECT ... FOR UPDATE NOWAIT) then you only need a single ephemeral lock node.

This client lib mimics the java API, but it’s simple to add a wrapper making it more idiomatic:

const {promisify} = require('util');
const zookeeper = require('node-zookeeper-client');

module.exports = function({ uri }) {
    this.connect = function() {
        return new Promise((resolve) => {
            var client = zookeeper.createClient(uri);
            client.once('connected', function () {
                resolve({
                    create: function(path) {
                        return promisify(client.create).bind(client)(path, null, zookeeper.ACL.OPEN_ACL_UNSAFE,
                            zookeeper.CreateMode.EPHEMERAL);
                    },

                    close: client.close.bind(client),
                });
            });
            client.connect();
        });
    };
};

And use this to hold a lock, while awaiting a promise:

module.exports = function(zookeeper, errorCodes) {
    return async function withLock(id, cb) {
        const client = await zookeeper.connect();
        try {
            const nodeName = `/foo_${id}`;
            await client.create(nodeName);
            const res = await cb();
            return res;
        } catch (err) {
            if (err.name === "NODE_EXISTS") {
                throw ...;
            }
            throw err;
        } finally {
            client.close();
        }
    };
};

Jobs that create jobs

Over the last few years, there has been a push for more “* as code” with Jenkins configuration. You can now specify job config using a Jenkinsfile, allowing auditing and code reviews, as well as a backup.

Combined with the Job DSL plugin, this makes it possible to create a seed job (using another Jenkinsfile, naturally) that creates all the jobs for a specific project.

pipeline {
    agent any

    options {
        timestamps ()
    }

    stages {
        stage('Clean') {
            steps {
                deleteDir()
            }
        }

        stage('Checkout') {
            steps {
                checkout scm
            }
        }

        stage('Job DSL') {
            steps {
                jobDsl targets: ['jobs/*.groovy', 'views/*.groovy'].join('\n')
            }
        }
    }
}

This will run all the groovy scripts in the jobs & views folders in this repo (once you’ve approved them).

For example:

pipelineJob("foo-main") {
    definition {
        cpsScm{
            scm {
                git {
                    remote {
                        github("examplecorp/foo", "ssh")
                    }
                    branch("main")
                }
            }
            scriptPath("Jenkinsfile")
        }
    }
    properties {
        githubProjectUrl('https://github.com/examplecorp/foo')
        pipelineTriggers {
            triggers {
                cron {
                     spec('@daily')
                }
                githubPush()
            }
        }
    }
}

And a view, to put it in:

listView('foo') {
    description('')

    jobs {
        regex('foo-.*')
    }

    columns {
        status()
        weather()
        name()
        lastSuccess()
        lastFailure()
        lastDuration()
        buildButton()
    }
}