Handle event function

The main difference between gen_statem and the older gen_fsm, is the addition of a new “handle event function” callback mode, with less restrictions on the state data type.

Converting an existing state machine, is pretty easy. Change the callback mode, and move the existing state functions into one callback:

callback_mode() -> handle_event_function.

handle_event({call, From}, get_balance, open, #{balance:=Balance} = Data) ->
    {keep_state, Data, [{reply, From, Balance}]};

handle_event({call, From}, close, open, Data) ->
    {next_state, closed, Data, [{reply, From, closed}]};

handle_event({call, From}, {deposit, Amount}, open, #{balance:=Balance} = Data) when is_number(Amount) andalso Amount > 0 ->
    NewBalance = Balance + Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, deposit_made}]};

handle_event({call, From}, {withdraw, Amount}, open, #{balance:=Balance} = Data) when is_number(Amount) andalso (Balance - Amount > 0) ->
    NewBalance = Balance - Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, withdrawal_made}]};

handle_event({call, From}, reopen, closed, Data) ->
    {next_state, open, Data, [{reply, From, open}]}.

Whether you prefer this style, or the previous version, seems like a matter of personal taste, if you don’t need the extra flexibility.

❤️ the State Machine

Having recently read a post about using state machines in JS, I decided to try implementing the same logic using the (relatively) new gen_statem.

As ever, the code is available here.

The initial example is a naive representation of a bank account:

account-events-withself

The first step is to fill out the gen_statem boilerplate:

-module(bank_statem).

-behaviour(gen_statem).

-export([init/1, callback_mode/0]).
-export([open/3]).

init([]) ->
    {ok, open, #{balance=>0}}.

callback_mode() -> state_functions.

open({call, From}, get_balance, #{balance:=Balance} = Data) ->
    {keep_state, Data, [{reply, From, Balance}]}.

We return an initial state of open, and a map containing the initial balance of 0.

I also added a get_balance call, so we can inspect the current data (which would normally be referred to as the “state” in a gen_server, confusingly).

The first state transition is to close an open account:

open({call, From}, close, Data) ->
    {next_state, closed, Data, [{reply, From, closed}]};

The function name tells you which state this can be called from (open), and the return value tells you that the state machine would transition to a new state (closed), as well as returning a value (the atom closed) to the caller.

Re-opening an account is pretty similar:

closed({call, From}, reopen, Data) ->
    {next_state, open, Data, [{reply, From, open}]}.

As close is only defined for the open state, and reopen is only defined for the closed state, any inappropriate calls will cause the process to crash.

Deposit & withdraw are a little different:

open({call, From}, {deposit, Amount}, #{balance:=Balance} = Data) when is_number(Amount) andalso Amount > 0 ->
    NewBalance = Balance + Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, deposit_made}]};

open({call, From}, {withdraw, Amount}, #{balance:=Balance} = Data) when is_number(Amount) andalso (Balance - Amount > 0) ->
    NewBalance = Balance - Amount,
    {keep_state, Data#{balance:=NewBalance}, [{reply, From, withdrawal_made}]}.

In that they retain the current state, but the data (in this case, the balance) is updated. Guards have also been used to validate the arguments; again, any other calls will cause an error.

This will seem pretty familiar, if you’ve ever used gen_fsm. Next time, we’ll look at the new alternative callback mode: handle_event_function.

Matching partial request bodies with Nock

A recent update to Nock meant that partial matches of the request body would no longer match (one dev’s bug is another’s feature).

Their recommended solution is to supply a predicate, and do the check yourself. That’s simple when just checking one property, but I didn’t really want to re-invent the wheel.

Seeing as I’m already using sinon, that seemed like the obvious solution:

    var expected = {a: 1};

    nock("https://foo.com")
            .post("/bar", checkReqBody(expected))
            .reply(200, '{}');

        function checkReqBody(expected) {
            return reqBody => {
                expect(reqBody).to.equal(sinon.match(expected));
                return true;
            };
        }

This works fine in the success case; but if the match fails, throwing an error in that callback tears down the test runner process, rather than failing the test.

A bit of digging in the sinon (and sinon-chai) source revealed the right method to call:

    return reqBody => {
        return sinon.match(expected).test(reqBody);
    };

This is now equivalent to the previous behaviour, if the match fails you get the generic nock error. I wanted to make more use of the sinon output, when the match failed, but never really got anywhere with it.

Using rebar3 with Docker

I prefer to avoid installing dev tools on my laptop. I used to create a separate Vagrant instance for each project, but using Docker should provide a lighter weight alternative.

Assuming you already have the Docker tooling installed, the first thing to do is download the base container:

docker pull erlang

Next, download the latest rebar3 binary:

curl -OL https://s3.amazonaws.com/rebar3/rebar3 && chmod +x ./rebar3

At this point, you’re ready to create the app skeleton:

docker run --name $APP_NAME -it --rm -v ${PWD}:/app -w /app erlang ./rebar3 new app $APP_NAME

Annoyingly, rebar assumes that it should create the folder for the project, so you’ll need to move all the generated files up a level (or put up with an extra subdir). Also, the generated files are owned by root, so you probably want to chown the entire dir.

If you’re going to use any rebar plugins, you probably want to put it’s cache dir somewhere that will survive container restarts. Add this to your rebar.config:

{global_rebar_dir, "/app/.cache"}.

And then you can compile your app:

docker run --name $APP_NAME -it --rm -v ${PWD}:/app -w /app -e REBAR_CACHE_DIR=/app erlang ./rebar3 compile

Streaming HLS video (redux)

In a previous post, I suggested a method for streaming HLS video using ffmpeg. Unfortunately, although I could have sworn it worked at the time, based on what I now know, this seems unlikely.

As far as I can tell, there’s no way you can use the hls muxer, and pipe the output; certainly not in the way I needed to use it, as the playlist is updated over time.

All is not lost though, the solution is to output the necessary files to disk:

app.get('/hls/video', function(req, res) {
    var output = "videos/out.m3u8";
    var stats;
    try {
        stats = fs.statSync(output);
    } catch (e) {
    }
    if (stats) {
        return res.redirect(output);
    }
    var redirected = false;
    var watcher = fs.watch("videos/", (e, f) => {
        if (e === "rename" && f === file) {
            if (!res.finished) {
                res.redirect(output);
                redirected = true;
            }
            watcher.close();
        }
    });
    var proc = ffmpeg(output);
    proc.on("exit", code => {
        if (code === 0) {
            if (!redirected && !res.finished) {
                res.redirect(output);
            }
            watcher.close();
        } else {
            res.status(500).end();
        }
    });
});
  
function ffmpeg(output) {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-pix_fmt", "yuv420p",
        "-f", "hls",
        "-hls_playlist_type", "event",
        "-profile:v", "baseline",
        "-level", "3.0"
    );
    args.push(output);
    return spawn(cmd, args);
}

When a video is requested, we check if the playlist already exists, if so we redirect immediately. If not, we run the ffmpeg command, and add a filewatcher.

Once the playlist (m3u8) is created, we redirect. As this is a “live” stream (playlist type: event), the playlist will be updated as the new chunks become ready.

We need to take care not to leak the filewatchers though, in the event of an error. The final piece is to serve the output artifacts, as static files:

app.use("/videos", express.static("videos", {
    setHeaders: function(res, path, stat) {
        if (path.indexOf(".ts") > -1) {
            res.set("cache-control", "public, max-age=300");
        }
    }
}));

The ts chunks can be cached (upstream, by varnish & cloudfront), but we need to ensure the m3u8 isn’t, as it will be changing until the video is complete.