Handle event function

The main difference between gen_statem and the older gen_fsm, is the addition of a new “handle event function” callback mode, with less restrictions on the state data type.

Converting an existing state machine, is pretty easy. Change the callback mode, and move the existing state functions into one callback:

callback_mode() -> handle_event_function.

handle_event({call, From}, get_balance, open, #{balance:=Balance} = Data) ->
    {keep_state, Data, [{reply, From, Balance}]};

handle_event({call, From}, close, open, Data) ->
    {next_state, closed, Data, [{reply, From, closed}]};

handle_event({call, From}, {deposit, Amount}, open, #{balance:=Balance} = Data) when is_number(Amount) andalso Amount > 0 ->
    NewBalance = Balance + Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, deposit_made}]};

handle_event({call, From}, {withdraw, Amount}, open, #{balance:=Balance} = Data) when is_number(Amount) andalso (Balance - Amount > 0) ->
    NewBalance = Balance - Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, withdrawal_made}]};

handle_event({call, From}, reopen, closed, Data) ->
    {next_state, open, Data, [{reply, From, open}]}.

Whether you prefer this style, or the previous version, seems like a matter of personal taste, if you don’t need the extra flexibility.

❤️ the State Machine

Having recently read a post about using state machines in JS, I decided to try implementing the same logic using the (relatively) new gen_statem.

As ever, the code is available here.

The initial example is a naive representation of a bank account:

account-events-withself

The first step is to fill out the gen_statem boilerplate:

-module(bank_statem).

-behaviour(gen_statem).

-export([init/1, callback_mode/0]).
-export([open/3]).

init([]) ->
    {ok, open, #{balance=>0}}.

callback_mode() -> state_functions.

open({call, From}, get_balance, #{balance:=Balance} = Data) ->
    {keep_state, Data, [{reply, From, Balance}]}.

We return an initial state of open, and a map containing the initial balance of 0.

I also added a get_balance call, so we can inspect the current data (which would normally be referred to as the “state” in a gen_server, confusingly).

The first state transition is to close an open account:

open({call, From}, close, Data) ->
    {next_state, closed, Data, [{reply, From, closed}]};

The function name tells you which state this can be called from (open), and the return value tells you that the state machine would transition to a new state (closed), as well as returning a value (the atom closed) to the caller.

Re-opening an account is pretty similar:

closed({call, From}, reopen, Data) ->
    {next_state, open, Data, [{reply, From, open}]}.

As close is only defined for the open state, and reopen is only defined for the closed state, any inappropriate calls will cause the process to crash.

Deposit & withdraw are a little different:

open({call, From}, {deposit, Amount}, #{balance:=Balance} = Data) when is_number(Amount) andalso Amount > 0 ->
    NewBalance = Balance + Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, deposit_made}]};

open({call, From}, {withdraw, Amount}, #{balance:=Balance} = Data) when is_number(Amount) andalso (Balance - Amount > 0) ->
    NewBalance = Balance - Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, withdrawal_made}]}.

In that they retain the current state, but the data (in this case, the balance) is updated. Guards have also been used to validate the arguments; again, any other calls will cause an error.

This will seem pretty familiar, if you’ve ever used gen_fsm. Next time, we’ll look at the new alternative callback mode: handle_event_function.

Matching partial request bodies with Nock

A recent update to Nock meant that partial matches of the request body would no longer match (one dev’s bug is another’s feature).

Their recommended solution is to supply a predicate, and do the check yourself. That’s simple when just checking one property, but I didn’t really want to re-invent the wheel.

Seeing as I’m already using sinon, that seemed like the obvious solution:

    var expected = {a: 1};

    nock("https://foo.com")
            .post("/bar", checkReqBody(expected))
            .reply(200, '{}');

        function checkReqBody(expected) {
            return reqBody => {
                expect(reqBody).to.equal(sinon.match(expected));
                return true;
            };
        }

This works fine in the success case; but if the match fails, throwing an error in that callback tears down the test runner process, rather than failing the test.

A bit of digging in the sinon (and sinon-chai) source revealed the right method to call:

    return reqBody => {
        return sinon.match(expected).test(reqBody);
    };

This is now equivalent to the previous behaviour, if the match fails you get the generic nock error. I wanted to make more use of the sinon output, when the match failed, but never really got anywhere with it.

Using rebar3 with Docker

I prefer to avoid installing dev tools on my laptop. I used to create a separate Vagrant instance for each project, but using Docker should provide a lighter weight alternative.

Assuming you already have the Docker tooling installed, the first thing to do is download the base container:

docker pull erlang

Next, download the latest rebar3 binary:

curl -OL https://s3.amazonaws.com/rebar3/rebar3 && chmod +x ./rebar3

At this point, you’re ready to create the app skeleton:

docker run --name $APP_NAME -it --rm -v ${PWD}:/app -w /app erlang ./rebar3 new app $APP_NAME

Annoyingly, rebar assumes that it should create the folder for the project, so you’ll need to move all the generated files up a level (or put up with an extra subdir). Also, the generated files are owned by root, so you probably want to chown the entire dir.

If you’re going to use any rebar plugins, you probably want to put it’s cache dir somewhere that will survive container restarts. Add this to your rebar.config:

{global_rebar_dir, "/app/.cache"}.

And then you can compile your app:

docker run --name $APP_NAME -it --rm -v ${PWD}:/app -w /app -e REBAR_CACHE_DIR=/app erlang ./rebar3 compile

Streaming HLS video (redux)

In a previous post, I suggested a method for streaming HLS video using ffmpeg. Unfortunately, although I could have sworn it worked at the time, based on what I now know, this seems unlikely.

As far as I can tell, there’s no way you can use the hls muxer, and pipe the output; certainly not in the way I needed to use it, as the playlist is updated over time.

All is not lost though, the solution is to output the necessary files to disk:

app.get('/hls/video', function(req, res) {
    var output = "videos/out.m3u8";
    var stats;
    try {
        stats = fs.statSync(output);
    } catch (e) {
    }
    if (stats) {
        return res.redirect(output);
    }
    var redirected = false;
    var watcher = fs.watch("videos/", (e, f) => {
        if (e === "rename" && f === file) {
            if (!res.finished) {
                res.redirect(output);
                redirected = true;
            }
            watcher.close();
        }
    });
    var proc = ffmpeg(output);
    proc.on("exit", code => {
        if (code === 0) {
            if (!redirected && !res.finished) {
                res.redirect(output);
            }
            watcher.close();
        } else {
            res.status(500).end();
        }
    });
});
  
function ffmpeg(output) {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-pix_fmt", "yuv420p",
        "-f", "hls",
        "-hls_playlist_type", "event",
        "-profile:v", "baseline",
        "-level", "3.0"
    );
    args.push(output);
    return spawn(cmd, args);
}

When a video is requested, we check if the playlist already exists, if so we redirect immediately. If not, we run the ffmpeg command, and add a filewatcher.

Once the playlist (m3u8) is created, we redirect. As this is a “live” stream (playlist type: event), the playlist will be updated as the new chunks become ready.

We need to take care not to leak the filewatchers though, in the event of an error. The final piece is to serve the output artifacts, as static files:

app.use("/videos", express.static("videos", {
    setHeaders: function(res, path, stat) {
        if (path.indexOf(".ts") > -1) {
            res.set("cache-control", "public, max-age=300");
        }
    }
}));

The ts chunks can be cached (upstream, by varnish & cloudfront), but we need to ensure the m3u8 isn’t, as it will be changing until the video is complete.

Returning errors when piping a stream

Most of the examples of piping a stream of data using expressjs look like this:

app.get('/video', function(req, res) {
    var cmd = "ffmpeg";
    var args = [...];
    var proc = spawn(cmd, args);
    res.contentType('video/mp4');
    proc.stdout.pipe(res);
});

Which is great for the happy path, but means any errors from the child proc are returned as a 200; and, in my case, cached. Not ideal.

I googled it pretty hard, and even asked on SO, with no joy. Eventually, I found this article, at which point I realised I’d been asking the wrong question!

The pipe method is part of the base library, nothing to do with express (obvious, in retrospect). And, as the documentation clearly states, it calls end when the readable stream ends.

So, the solution is to do that bit yourself:

proc.stdout.pipe(res, {end: false});
proc.on("error", err => {
    console.log("error from ffmpeg", err.stack);
    res.status(500).end();
}); 
proc.on("exit", code => {
    console.log("child proc exited", code);
    res.status(code === 200 ? 200 : 500).end();
});

Boom! (Just remember to handle all the cases, so end is always called).

Streaming video to iOS devices

UPDATE: this is fake news, see my newer post for more info.

It seems that neither iOS devices, nor Safari on OS X, support mp4; so if you’re trying to stream video, you need to provide another format.

The recommendation is to use HLS, which fortunately is supported by ffmpeg, you merely need to adjust your incantation:

app.get('/hls/video', function(req, res) {
    res.contentType('application/vnd.apple.mpegurl');
    var proc = ffmpeg();
    proc.stdout.pipe(res);
});
 
function ffmpeg() {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-f", "hls",
        "-hls_time", "9",
        "-hls_list_size", "0",
        "-profile:v", "baseline",
        "-level", "3.0",
        "pipe:1"
    );
    return spawn(cmd, args);
}

I could probably use conneg to decide which format to return, rather than the uri, but I’m not convinced that my caching infrastructure (varnish and cloudfront now!) would handle that correctly.

Streaming video from ffmpeg

ffmpeg is a fantastic tool for converting, concatenating, or otherwise fiddling with video content. If you can generate what you need in advance, then you can upload it to s3 (or some other CDN like option); but sometimes you need to stream video on demand.

First, you need to get the ffmpeg incantation right:

ffmpeg -i video1.mp4 -i video2.mp4 ... \
       -filter_complex "something something" \
       -movflags "frag_keyframe+empty_moov"
       -f mp4 pipe:1

Using pipe:1 sends the output to stdout (as documented here), and the movflags are needed to allow streaming in mp4 format.

With this in place, it’s easy to pipe the output data to a browser using expressjs:

#!/usr/bin/env node
"use strict";

const express = require('express');
const { spawn } = require('child_process');

var app = express();

app.get('/video', function(req, res) {
    res.contentType('video/mp4');
    var proc = ffmpeg();
    proc.stdout.pipe(res);

    res.on("close", () => {
        proc.kill("SIGKILL");
    });
});

app.listen(4000);

function ffmpeg() {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-filter_complex", filter,
        "-s", "1280x720",
        "-acodec", "aac",
        "-vcodec", "h264",
        "-movflags", "frag_keyframe+empty_moov",
        "-f", "mp4",
        "pipe:1"
    );
    return spawn(cmd, args);
}

I found I ended up with “zombie” ffmpeg processes, if the client connection had closed before the video ended (because it’s still trying to write data to the pipe?). There’s probably a neater way to solve that, but kill -9 is pretty effective!

At this point, you can stream some video; but this solution won’t scale, every request runs the ffmpeg command, which is pretty cpu intensive. It would be relatively simple to cache the output, either in memory or on disk; or you could put the node process behind a caching proxy.

However, the best solution for our needs seemed to be using this as an “origin server” for Cloudfront. This means that only the first request (of each type) hits our server, and the response is cached for as long as necessary.

If you need to render different videos for every request, then you’ll just have to throw money at it!