Streaming a csv from postgresql

If you want to build an endpoint to download a csv, that could contain a large number of rows; you want to use streams, so you don’t need to hold all the data in memory before writing it.

If you are already using the pg client, it has a nifty add-on for this purpose:

const { Client } = require('pg');
const QueryStream = require('pg-query-stream');
const csvWriter = require("csv-write-stream");

module.exports = function(connectionString) {
    this.handle = function(req, res) {
        var sql = "SELECT...";
        var args = [...];

        const client = new Client({connectionString});
        client.connect().then(() => {
            var stream = new QueryStream(sql, args);
            stream.on('end', () => {
                client.end();
            });
            var query = client.query(stream);

            var writer = csvWriter();
            res.contentType("text/csv");
            writer.pipe(res);

            query.pipe(writer);
        });
    };
};

If you need to transform the data, you can add another step:

...

const transform = require('stream-transform');

            ...

            var query = client.query(stream);

            var transformer = transform(r => ({
                "User ID": r.user_id,
                "Created": r.created.toISOString(),
                ...
            }));

            ...

            query.pipe(transformer).pipe(writer);

Matching partial request bodies with Nock

A recent update to Nock meant that partial matches of the request body would no longer match (one dev’s bug is another’s feature).

Their recommended solution is to supply a predicate, and do the check yourself. That’s simple when just checking one property, but I didn’t really want to re-invent the wheel.

Seeing as I’m already using sinon, that seemed like the obvious solution:

    var expected = {a: 1};

    nock("https://foo.com")
            .post("/bar", checkReqBody(expected))
            .reply(200, '{}');

        function checkReqBody(expected) {
            return reqBody => {
                expect(reqBody).to.equal(sinon.match(expected));
                return true;
            };
        }

This works fine in the success case; but if the match fails, throwing an error in that callback tears down the test runner process, rather than failing the test.

A bit of digging in the sinon (and sinon-chai) source revealed the right method to call:

    return reqBody => {
        return sinon.match(expected).test(reqBody);
    };

This is now equivalent to the previous behaviour, if the match fails you get the generic nock error. I wanted to make more use of the sinon output, when the match failed, but never really got anywhere with it.

Streaming HLS video (redux)

In a previous post, I suggested a method for streaming HLS video using ffmpeg. Unfortunately, although I could have sworn it worked at the time, based on what I now know, this seems unlikely.

As far as I can tell, there’s no way you can use the hls muxer, and pipe the output; certainly not in the way I needed to use it, as the playlist is updated over time.

All is not lost though, the solution is to output the necessary files to disk:

app.get('/hls/video', function(req, res) {
    var output = "videos/out.m3u8";
    var stats;
    try {
        stats = fs.statSync(output);
    } catch (e) {
    }
    if (stats) {
        return res.redirect(output);
    }
    var redirected = false;
    var watcher = fs.watch("videos/", (e, f) => {
        if (e === "rename" && f === file) {
            if (!res.finished) {
                res.redirect(output);
                redirected = true;
            }
            watcher.close();
        }
    });
    var proc = ffmpeg(output);
    proc.on("exit", code => {
        if (code === 0) {
            if (!redirected && !res.finished) {
                res.redirect(output);
            }
            watcher.close();
        } else {
            res.status(500).end();
        }
    });
});
  
function ffmpeg(output) {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-pix_fmt", "yuv420p",
        "-f", "hls",
        "-hls_playlist_type", "event",
        "-profile:v", "baseline",
        "-level", "3.0"
    );
    args.push(output);
    return spawn(cmd, args);
}

When a video is requested, we check if the playlist already exists, if so we redirect immediately. If not, we run the ffmpeg command, and add a filewatcher.

Once the playlist (m3u8) is created, we redirect. As this is a “live” stream (playlist type: event), the playlist will be updated as the new chunks become ready.

We need to take care not to leak the filewatchers though, in the event of an error. The final piece is to serve the output artifacts, as static files:

app.use("/videos", express.static("videos", {
    setHeaders: function(res, path, stat) {
        if (path.indexOf(".ts") > -1) {
            res.set("cache-control", "public, max-age=300");
        }
    }
}));

The ts chunks can be cached (upstream, by varnish & cloudfront), but we need to ensure the m3u8 isn’t, as it will be changing until the video is complete.

Returning errors when piping a stream

Most of the examples of piping a stream of data using expressjs look like this:

app.get('/video', function(req, res) {
    var cmd = "ffmpeg";
    var args = [...];
    var proc = spawn(cmd, args);
    res.contentType('video/mp4');
    proc.stdout.pipe(res);
});

Which is great for the happy path, but means any errors from the child proc are returned as a 200; and, in my case, cached. Not ideal.

I googled it pretty hard, and even asked on SO, with no joy. Eventually, I found this article, at which point I realised I’d been asking the wrong question!

The pipe method is part of the base library, nothing to do with express (obvious, in retrospect). And, as the documentation clearly states, it calls end when the readable stream ends.

So, the solution is to do that bit yourself:

proc.stdout.pipe(res, {end: false});
proc.on("error", err => {
    console.log("error from ffmpeg", err.stack);
    res.status(500).end();
}); 
proc.on("exit", code => {
    console.log("child proc exited", code);
    res.status(code === 200 ? 200 : 500).end();
});

Boom! (Just remember to handle all the cases, so end is always called).

Streaming video to iOS devices

UPDATE: this is fake news, see my newer post for more info.

It seems that neither iOS devices, nor Safari on OS X, support mp4; so if you’re trying to stream video, you need to provide another format.

The recommendation is to use HLS, which fortunately is supported by ffmpeg, you merely need to adjust your incantation:

app.get('/hls/video', function(req, res) {
    res.contentType('application/vnd.apple.mpegurl');
    var proc = ffmpeg();
    proc.stdout.pipe(res);
});
 
function ffmpeg() {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-f", "hls",
        "-hls_time", "9",
        "-hls_list_size", "0",
        "-profile:v", "baseline",
        "-level", "3.0",
        "pipe:1"
    );
    return spawn(cmd, args);
}

I could probably use conneg to decide which format to return, rather than the uri, but I’m not convinced that my caching infrastructure (varnish and cloudfront now!) would handle that correctly.

“Trouble parsing json”

We use Bunyan in our node apps, for “structured logging”. The output json string is passed to syslog, by systemd, and then fed into ELK.

{
    "name":"foo-service",
    "hostname":"app-01",
    "pid":30988,
    "ip":"1.19.24.8",
    "requestId":"1c11f448-73f2-4efa-bc63-3de787618d49",
    "level":50,
    "err": {
        "message":"oh noes!"
    }
}

Unfortunately, if that string is longer than 2048 chars (usually a stacktrace, or html returned from a web service instead of json), then the json blob ends up split over 2 lines in syslog.

This causes ELK to barf when attempting to parse the broken lines (assuming you are parsing as json), and means you won’t see those errors in Kibana.

It is possible to detect the error parsing the error, by searching for the string “Trouble parsing json”, but that’s not really a solution.

I would prefer to see a truncated error, than have the current situation, but that means either wrapping or patching Bunyan itself.

Node.js and systemd-notify

Running a nodejs app as a systemd service is pretty simple. But the downside to this approach is that if the app dies straight away (e.g. a config file is malformed), then systemd remains unaware.

An alternative is to switch to using notify:

[Service]
Type=notify
ExecStart=/var/www/app_name/app.js
Restart=always
...

There are various ways to notify systemd that your service is ready; but the simplest is to use systemd-notify, a console wrapper around sd_notify.

this.start = function(port) {
    var deferred = Q.defer();

    server = app.listen(port, '127.0.0.1', function(error) {
        if (error) {
            deferred.reject(new Error(error));
        } else {
            logger.info('Listening on port %d', port);
            exec('systemd-notify --ready');
            deferred.resolve();
        }
        });

        return deferred.promise;
    };
};

If the process that calls systemd-notify is not the one in ExecStart (e.g. you are using cluster), you will also need to set NotifyAccess to “all”.

Now when you use systemctl to start your service, it will wait for the notification before deeming the start process complete.

Managing nodejs dependencies

The easiest way to “deploy” a node app is to clone the git repo on a server, and run npm install. There are a couple of disadvantages though: first, I don’t really like having to install git, and manage credentials for a private repo.

Second, installing the dependencies like that means you may get different versions of the modules you rely on than you were expecting. One of the tenets of a reliable build pipeline is ensuring that builds are repeatable, and that what you deploy matches what you tested.

There are a few alternatives: you could vendor in the node_modules folder, but this dirties the commit history, and increases the size of your repo. You could use npm shrinkwrap, which is the same concept as Bundler’s Gemfile.lock, a list of specific versions to install. This is definitely an improvement, but still leaves the risk of npm i failing during a deployment.

I’d prefer to only install the dependencies once, on the build server. This means I can run the tests, then tar up the folder and upload that to each environment in turn:

npm install
npm test
npm prune --production
npm dedupe
tar -czf app.tar.gz --exclude='*/coverage' app/

After running the tests, we prune the dependencies to remove mocha etc. We then dedupe to try and reduce the number of copies of shared modules, and finally create an archive of the app. This is output as an artifact of the build, and pulled in the by the deploy to staging build, output from that again, and finally pulled in by the deploy to production build.

Arrow functions returning an expression

One of the new features of ES6 that has made it into Node 4 is arrow functions. According to the documentation, there are 2 alternative syntaxes:

(param1, param2, …, paramN) => { statements }
(param1, param2, …, paramN) => expression
         // equivalent to:  => { return expression; }

Using the 2nd version works as expected for primitive values:

> [1,2,3].map(i => i);
[ 1, 2, 3 ]

but not when you return an object:

> [1,2,3].map(i => { id: i });
[ undefined, undefined, undefined ]

As was pointed out to me on SO, it’s impossible to tell the difference between a statement block and an expression consisting of an object literal.

The solution is to wrap the returned object in an extra set of parentheses:

> [1,2,3].map(i => ({ id: i }));
[ {id: 1}, {id: 2}, {id: 3} ]