Zero downtime deployments with node cluster

The easiest way to do zero downtime deployments is using multiple nodes behind a load balancer. Once removed from the rotation, you can fiddle with them to your heart’s content.

If you only have one box to play with, things aren’t so simple. One option is to push the complexity up to whatever you’re using to orchestrate deployments. You could do something like a blue/green deployment, with two full sets of processes behind nginx as a load balancer; but this felt liable to be very fragile.

I next started looking at a process manager, like pm2; but it seemed to offer far too many features I didn’t need, and didn’t play that well with systemd. It was inspiration though, for just going direct to the nodejs cluster API. It’s been available for some time, and is now marked as “stable”.

It allows you to run multiple node processes that share a port, which is also useful if you are running on hardware with multiple CPUs. Using the cluster API allows us to recycle the processes when the code has changed, without dropping any requests:

var cluster = require('cluster');

module.exports = function(start) {
    var env = process.env.NODE_ENV || 'local';
    var cwd = process.cwd();

    var numWorkers = process.env.NUM_WORKERS || 1;

    if (cluster.isMaster) {
        fork();

        cluster.on('exit', function(worker, code, signal) {
            // one for all, let systemd deal with it
            if (code !== 0) {
                process.exit(code);
            }
        });

        process.on('SIGHUP', function() {
            // need to chdir, if the old directory was deleted
            process.chdir(cwd);
            var oldWorkers = Object.keys(cluster.workers);
            // wait until at least one new worker is listening
            // before terminating the old workers
            cluster.once('listening', function(worker, address) {
                kill(oldWorkers);
            });
            fork();
        });
    } else {
        start(env);
    }

    function fork() {
        for (var i = 0; i < numWorkers; i++) {
            cluster.fork();
        }
    }

    function kill(workers) {
        if (workers.length) {
            var id = workers.pop();
            var worker = cluster.workers[id];
            worker.send('shutdown');
            worker.disconnect();
            kill(workers);
        }
    }
};

During a deployment, we simply replace the old code and send a signal to the “master” process (kill -HUP $PID); which causes it to spin up X new workers. As soon as one of those is ready and listening, it terminates the old workers.

Grokking postgresql logs with logstash

Logstash provides a grok pattern for postgresql logs. Unfortunately, it doesn’t seem to be compatible with our postgres version (9.4), and our messages were all tagged with “_grokparsefailure”.

Using the fantastic grok debugger, I was able to produce something that worked:

%{DATESTAMP:timestamp} %{TZ} %{DATA:user_id} %{GREEDYDATA:connection_id} %{DATA:level}:  %{GREEDYDATA:msg}

I’ve created an issue here, to track it.

apt update_cache with ansible

Installing 3rd party software, e.g. elasticsearch, sometimes involves adding an apt repo:

- name: Add apt repository
  apt_repository: repo='deb https://example.com/apt/example jessie main'

Once this has been added, it’s necessary to call apt-get update before the new software can be installed. It’s tempting to do so by adding update_cache=yes to the apt call:

- name: Install pkg
  apt: name=example update_cache=yes

But a better solution is to separate the two:

- name: Add apt repository
  apt_repository: repo='deb https://example.com/apt/example jessie main'
  register: apt_source_added

- name: Update cache
  apt: update_cache=yes
  when: apt_source_added|changed

- name: Install pkg
  apt: name=example

This ensures that the (time consuming) update only happens the first time, when the new repo is added. It also makes it much clearer what is taking the time, if the build hangs.

EDIT: I completely forgot that it’s possible to add the update_cache attribute directly to the apt_repository call. Much simpler!

- name: Add apt repository
  apt_repository: repo='deb https://example.com/apt/example jessie main' update_cache=yes

- name: Install pkg
  apt: name=example

Converting complex js objects to xml

The xml node package offers “fast and simple Javascript-based XML generation”.

I had hoped that it would be as simple as:

var xml = require('xml');

var result = xml({
    FOO: {
        BAR: "something"
    }
});

But that only included the top level element:

<FOO />

After some RTFM, and a few false starts, it became clear that you need to represent each child node as an array of key value pairs:

xml({
    FOO: [
        {_attrs: { abra: "cadabra" },
        { BAR: "something" },
        { BAZ: "else" }
    ]
});

Which should result in XML like this:

<FOO abra="cadabra">
    <BAR>something</BAR>
    <BAZ>else</BAZ>
</FOO>

Installing a Shopify app into a dev shop

If you’ve been following the instructions to build a Shopify “app”, you may reach the end and feel like you’ve been asked to draw an owl instead.

There are a few pointers here and there on the internet, but these are the steps I went through.

First, while logged in, request the permissions you want:

https://shop.myshopify.com/admin/oauth/authorize?client_id=app_api_key&scope=required_scopes

You’ll then be redirected to the url you set up for your shop. If you don’t want to go through the hassle of actually deploying something yet, fear not. You can take the uri you were redirected to:

https://app_uri/?code=auth_code&shop=shop.myshopify.com&signature=signature×tamp=timestamp

And exchange the auth code for a re-usable token:

curl -d "client_id={app_id}&client_secret={app_secret}&code={auth_code}" https://{shop}.myshopify.com/admin/oauth/access_token

{"access_token":"{token}"}

That will grant you access, within the scopes that you requested:

curl -H "X-Shopify-Access-Token: {token}" -H "Content-Type: application/json" -d "{\"script_tag\":{\"event\":\"onload\",\"src\":\"https://foo.net/bar.js\"}}" https://{shop}.myshopify.com/admin/script_tags.json

Creating a Stream Analytics Query job programatically

There are plenty of examples of creating Azure Stream Analytics Query jobs, but most of them use the wizard in the management portal.

I found this example, which has most of the info needed; but unfortunately, it authenticates by popping up a login form. Not ideal for a server application!

My next attempt was using a cert, but it turns out the new Azure Resource Management APIs will only work with Azure AD. I started working my way through this example:

            var authContext = new AuthenticationContext("https://login.microsoftonline.com/{tenantId}/oauth2/token");
            var clientCredential = new ClientCredential(clientId, appKey);

            var result = authContext.AcquireToken("https://management.core.windows.net/", clientCredential);

            var creds = new TokenCloudCredentials(subscriptionId, result.AccessToken);
            var client = new StreamAnalyticsManagementClient(creds);

I could acquire a token without any problem, but when I came to try an create an SA query job I got an auth error:

AuthorizationFailed: The client 'REDACTED' with object id 'REDACTED' does not have authorization to perform action 'Microsoft.StreamAnalytics/streamingjobs/write' over scope '/subscriptions/REDACTED/resourcegroups/REDACTED/providers/Microsoft.StreamAnalytics/streamingjobs/REDACTED'

With some help from SO, it became clear that my AD app didn’t have the necessary permissions. There currently isn’t a built-in role with sufficient authoritar; so we had to drop the hammer and use the “Contributor” role, that can do just about anything:

New-AzureRoleAssignment -ServicePrincipalName {azureADAppUri} -RoleDefinitionName Contributor -Scope /subscriptions/{tenantId}/resourcegroups/{resourceGroupId}/providers/Microsoft.StreamAnalytics/streamingjobs/*

With that in place, creating the query job is trivial:

            var jobCreateParameters = new JobCreateOrUpdateParameters
            {
                Job = new Job
                {
                    Name = streamAnalyticsJobName,
                    Location = "North Europe",
                    Properties = new JobProperties
                    {
                        EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Adjust,
                        Sku = new Sku
                        {
                            Name = "Standard"
                        }
                    }
                }
            };

            client.StreamingJobs.CreateOrUpdate(resourceGroupName, jobCreateParameters);

            Console.WriteLine("Created job");

            var jobInputCreateParameters = new InputCreateOrUpdateParameters {
                Input = new Input {
                    Name = inputName,
                    Properties = new StreamInputProperties {
                        Serialization = new JsonSerialization
                        {
                            Properties = new JsonSerializationProperties
                            {
                                Encoding = "UTF8"
                            }
                        },
                        DataSource = new EventHubStreamInputDataSource
                        {
                            Properties = new EventHubStreamInputDataSourceProperties
                            {
                                EventHubName = inputEventHubName,
                                ConsumerGroupName = inputConsumerGroup,
                                ServiceBusNamespace = ns,
                                SharedAccessPolicyName = "listen",
                                SharedAccessPolicyKey = inputEventHubListenKey
                            }
                        }
                    }
                }
            };

            client.Inputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobInputCreateParameters);

            Console.WriteLine("Created job input");


            client.Inputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsInputName);

            Console.WriteLine("Tested job input");

            var jobOutputCreateParameters = new OutputCreateOrUpdateParameters {
                Output = new Output {
                    Name = streamAnalyticsOutputName,
                    Properties = new OutputProperties {
                        Serialization = new JsonSerialization
                        {
                            Properties = new JsonSerializationProperties
                            {
                                Encoding = "UTF8",
                                Format = "LineSeparated"
                            }
                        },
                        DataSource = new EventHubOutputDataSource {
                            Properties = new EventHubOutputDataSourceProperties {
                                EventHubName = outputEventHubName,
                                ServiceBusNamespace = ns,
                                SharedAccessPolicyName = "send",
                                SharedAccessPolicyKey = outputEventHubSendKey
                            }
                        }
                    }
                }
            };

            client.Outputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobOutputCreateParameters);

            Console.WriteLine("Created job output");

            client.Outputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsOutputName);

            Console.WriteLine("Tested job output");

            var transformationCreateParameters = new TransformationCreateOrUpdateParameters {
                Transformation = new Transformation {
                    Name = streamAnalyticsTransformationName,
                    Properties = new TransformationProperties {
                        StreamingUnits = 1,
                        Query = query
                    }
                }
            };

            client.Transformations.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, transformationCreateParameters);

            Console.WriteLine("Created transformation");

            var jobStartParameters = new JobStartParameters
            {
                OutputStartMode = OutputStartMode.CustomTime,
                OutputStartTime = DateTime.UtcNow
            };

            client.StreamingJobs.Start(resourceGroupName, streamAnalyticsJobName, jobStartParameters);

            Console.WriteLine("Started job");

            Console.ReadLine();

The API is a little odd, in that it returns a Response object, but seems to throw if the request is not successful.

Property changed decorator using ES7

Watching a model for changes is a pretty common requirement, and you often end up with code like this (in ES6):

export default class Foo extends EventEmitter {
    get bar() {
        return this._bar;
    }

    set bar(value) {
        if (value !== this._bar) {
            this._bar = value;
            this.emit("barChanged");
        }
    }
}

Boilerplate code is always an excuse for metaprogramming! And this seemed like an ideal use case for the proposed ES7 decorators.

We’re using babeljs via babelify, so needed to enable decorators in our gulpfile:

return browserify({
    ...
    transform: [babelify.configure({
        optional: ["es7.decorators"]
    })],
    ...
});

Once enabled, you can add the decorator:

export default class Foo extends EventEmitter {
    @propertyChanged
    get bar() {
        return this._bar;
    }

    set bar(value) {
        this._bar = value;
    }
}

function propertyChanged(target, name, descriptor) {
    let getter = descriptor.get, setter = descriptor.set;

    descriptor.set = function(value) {
        if (value !== getter.call(this)) {
            setter.call(this, value);
            this.emit(name + "Changed");
        }
    };

    return descriptor;
}

Parsing json from syslog entries with logstash

A consequence of moving to Debian 8 (and hence systemd), is that all our log data now goes to syslog. So long logrotate!

It does however require a change to the way we filter them, once they’ve been aggregated:

filter {
    if [type] == "syslog" {
        grok {
            match => { "message" => "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" }
        }
    }

    if [program] == "foo" {
        json {
            source => "syslog_message"
        }
        mutate {
            convert => [ "level", "integer" ]
            remove_field => [ "hostname "]
        }
        date {
            match => [ "time", "ISO8601" ]
        }
    }
}

First, we parse the syslog entry, and put the free form message into a property named “syslog_message”. We could overwrite the existing message, but this makes it easier to investigate if it goes wrong.

Then, if the “program” (set by the SyslogIdentifier in your systemd unit file) matches, we parse the message as json and tidy up a few fields.

Ansible & systemctl daemon-reload

(UPDATE 2: there’s also a systemd module now, which should provide a neater wrapper round these commands. Explicit daemon-reload is still required)

(UPDATE: there is now a PR open to add this functionality)

We recently migrated to Debian 8 which, by default, uses systemd. I can appreciate why some people have misgivings about it, but from my point of view it’s been a massive improvement.

Our unit files look like this now:

[Service]
ExecStart=/var/www/{{ app_name }}/app.js
Restart=always
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier={{ app_name }}
User={{ app_name }}
Group={{ app_name }}
Environment=NODE_ENV={{ env }}
WorkingDirectory=/var/www/{{ app_name }}

[Install]
WantedBy=multi-user.target

Compared to a 3 page init script, using start-stop-daemon. And we no longer need a watchdog like monit.

We do our deployments using ansible, which already knows how to play nice with systemd. One thing missing though, is that if you change a unit file you need to call systemctl daemon-reload before the changes will be picked up.

There’s a discussion underway as to whether ansible should take care of it. But for now, the easiest thing to do is add another handler:

- name: Install unit file
  sudo: true
  copy: src=foo.service dest=/lib/systemd/system/ owner=root mode=644
  notify:
    - reload systemd
    - restart foo

with a handler like this:

- name: reload systemd
  sudo: yes
  command: systemctl daemon-reload

UPDATE: if you need to restart the service later in the same play, you can flush the handlers to ensure daemon-reload has been called:

- meta: flush_handlers

Stopping a locust

Locust is a load testing tool that allows you to script your actions in python (much more pleasant than fiddling with jMeter).

I was investigating an issue that only happened under load (turned out to be due to starvation of the db connection pool). The trouble was, after it had happened, the logs filled up with errors, which weren’t useful information. I wanted the load test to stop, as soon as the first error had occurred.

I couldn’t find anything in the documentation, but a quick nose in the source revealed the existence of a StopLocust exception:

from locust.exception import StopLocust

...

response = locust.client.post(uri, data=json.dumps(data), headers=headers)
if response.status_code != 200:
    raise StopLocust()

At any point, you can raise it, and that locust will stop. If you wanted to stop them all, you could set a flag and check it in your other tasks (not ideal, but the best I can offer).

Remember this is undocumented behaviour, and could change at any time, but it works in v0.7.2.