Returning errors when piping a stream

Most of the examples of piping a stream of data using expressjs look like this:

app.get('/video', function(req, res) {
    var cmd = "ffmpeg";
    var args = [...];
    var proc = spawn(cmd, args);
    res.contentType('video/mp4');
    proc.stdout.pipe(res);
});

Which is great for the happy path, but means any errors from the child proc are returned as a 200; and, in my case, cached. Not ideal.

I googled it pretty hard, and even asked on SO, with no joy. Eventually, I found this article, at which point I realised I’d been asking the wrong question!

The pipe method is part of the base library, nothing to do with express (obvious, in retrospect). And, as the documentation clearly states, it calls end when the readable stream ends.

So, the solution is to do that bit yourself:

proc.stdout.pipe(res, {end: false});
proc.on("error", err => {
    console.log("error from ffmpeg", err.stack);
    res.status(500).end();
}); 
proc.on("exit", code => {
    console.log("child proc exited", code);
    res.status(code === 200 ? 200 : 500).end();
});

Boom! (Just remember to handle all the cases, so end is always called).

Streaming video to iOS devices

It seems that neither iOS devices, nor Safari on OS X, support mp4; so if you’re trying to stream video, you need to provide another format.

The recommendation is to use HLS, which fortunately is supported by ffmpeg, you merely need to adjust your incantation:

app.get('/hls/video', function(req, res) {
    res.contentType('application/vnd.apple.mpegurl');
    var proc = ffmpeg();
    proc.stdout.pipe(res);
});
 
function ffmpeg() {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-f", "hls",
        "-hls_time", "9",
        "-hls_list_size", "0",
        "-profile:v", "baseline",
        "-level", "3.0",
        "pipe:1"
    );
    return spawn(cmd, args);
}

I could probably use conneg to decide which format to return, rather than the uri, but I’m not convinced that my caching infrastructure (varnish and cloudfront now!) would handle that correctly.

“Trouble parsing json”

We use Bunyan in our node apps, for “structured logging”. The output json string is passed to syslog, by systemd, and then fed into ELK.

{
    "name":"foo-service",
    "hostname":"app-01",
    "pid":30988,
    "ip":"1.19.24.8",
    "requestId":"1c11f448-73f2-4efa-bc63-3de787618d49",
    "level":50,
    "err": {
        "message":"oh noes!"
    }
}

Unfortunately, if that string is longer than 2048 chars (usually a stacktrace, or html returned from a web service instead of json), then the json blob ends up split over 2 lines in syslog.

This causes ELK to barf when attempting to parse the broken lines (assuming you are parsing as json), and means you won’t see those errors in Kibana.

It is possible to detect the error parsing the error, by searching for the string “Trouble parsing json”, but that’s not really a solution.

I would prefer to see a truncated error, than have the current situation, but that means either wrapping or patching Bunyan itself.

Node.js and systemd-notify

Running a nodejs app as a systemd service is pretty simple. But the downside to this approach is that if the app dies straight away (e.g. a config file is malformed), then systemd remains unaware.

An alternative is to switch to using notify:

[Service]
Type=notify
ExecStart=/var/www/app_name/app.js
Restart=always
...

There are various ways to notify systemd that your service is ready; but the simplest is to use systemd-notify, a console wrapper around sd_notify.

this.start = function(port) {
    var deferred = Q.defer();

    server = app.listen(port, '127.0.0.1', function(error) {
        if (error) {
            deferred.reject(new Error(error));
        } else {
            logger.info('Listening on port %d', port);
            exec('systemd-notify --ready');
            deferred.resolve();
        }
        });

        return deferred.promise;
    };
};

If the process that calls systemd-notify is not the one in ExecStart (e.g. you are using cluster), you will also need to set NotifyAccess to “all”.

Now when you use systemctl to start your service, it will wait for the notification before deeming the start process complete.

Managing nodejs dependencies

The easiest way to “deploy” a node app is to clone the git repo on a server, and run npm install. There are a couple of disadvantages though: first, I don’t really like having to install git, and manage credentials for a private repo.

Second, installing the dependencies like that means you may get different versions of the modules you rely on than you were expecting. One of the tenets of a reliable build pipeline is ensuring that builds are repeatable, and that what you deploy matches what you tested.

There are a few alternatives: you could vendor in the node_modules folder, but this dirties the commit history, and increases the size of your repo. You could use npm shrinkwrap, which is the same concept as Bundler’s Gemfile.lock, a list of specific versions to install. This is definitely an improvement, but still leaves the risk of npm i failing during a deployment.

I’d prefer to only install the dependencies once, on the build server. This means I can run the tests, then tar up the folder and upload that to each environment in turn:

npm install
npm test
npm prune --production
npm dedupe
tar -czf app.tar.gz --exclude='*/coverage' app/

After running the tests, we prune the dependencies to remove mocha etc. We then dedupe to try and reduce the number of copies of shared modules, and finally create an archive of the app. This is output as an artifact of the build, and pulled in the by the deploy to staging build, output from that again, and finally pulled in by the deploy to production build.

Arrow functions returning an expression

One of the new features of ES6 that has made it into Node 4 is arrow functions. According to the documentation, there are 2 alternative syntaxes:

(param1, param2, …, paramN) => { statements }
(param1, param2, …, paramN) => expression
         // equivalent to:  => { return expression; }

Using the 2nd version works as expected for primitive values:

> [1,2,3].map(i => i);
[ 1, 2, 3 ]

but not when you return an object:

> [1,2,3].map(i => { id: i });
[ undefined, undefined, undefined ]

As was pointed out to me on SO, it’s impossible to tell the difference between a statement block and an expression consisting of an object literal.

The solution is to wrap the returned object in an extra set of parentheses:

> [1,2,3].map(i => ({ id: i }));
[ {id: 1}, {id: 2}, {id: 3} ]

Zero downtime deployments with node cluster

The easiest way to do zero downtime deployments is using multiple nodes behind a load balancer. Once removed from the rotation, you can fiddle with them to your heart’s content.

If you only have one box to play with, things aren’t so simple. One option is to push the complexity up to whatever you’re using to orchestrate deployments. You could do something like a blue/green deployment, with two full sets of processes behind nginx as a load balancer; but this felt liable to be very fragile.

I next started looking at a process manager, like pm2; but it seemed to offer far too many features I didn’t need, and didn’t play that well with systemd. It was inspiration though, for just going direct to the nodejs cluster API. It’s been available for some time, and is now marked as “stable”.

It allows you to run multiple node processes that share a port, which is also useful if you are running on hardware with multiple CPUs. Using the cluster API allows us to recycle the processes when the code has changed, without dropping any requests:

var cluster = require('cluster');

module.exports = function(start) {
    var env = process.env.NODE_ENV || 'local';
    var cwd = process.cwd();

    var numWorkers = process.env.NUM_WORKERS || 1;

    if (cluster.isMaster) {
        fork();

        cluster.on('exit', function(worker, code, signal) {
            // one for all, let systemd deal with it
            if (code !== 0) {
                process.exit(code);
            }
        });

        process.on('SIGHUP', function() {
            // need to chdir, if the old directory was deleted
            process.chdir(cwd);
            var oldWorkers = Object.keys(cluster.workers);
            // wait until at least one new worker is listening
            // before terminating the old workers
            cluster.once('listening', function(worker, address) {
                kill(oldWorkers);
            });
            fork();
        });
    } else {
        start(env);
    }

    function fork() {
        for (var i = 0; i < numWorkers; i++) {
            cluster.fork();
        }
    }

    function kill(workers) {
        if (workers.length) {
            var id = workers.pop();
            var worker = cluster.workers[id];
            worker.send('shutdown');
            worker.disconnect();
            kill(workers);
        }
    }
};

During a deployment, we simply replace the old code and send a signal to the “master” process (kill -HUP $PID); which causes it to spin up X new workers. As soon as one of those is ready and listening, it terminates the old workers.