Json, in a jiffy!

Working with json in Erlang has vastly improved since the last time I looked at it, particularly with the addition of maps to the language (in R17).

-module(foo_handler).

-export([init/3]).
-export([allowed_methods/2, content_types_accepted/2]).
-export([foo/2]).

init(_Transport, _Req, []) ->
    {upgrade, protocol, cowboy_rest}.

allowed_methods(Req, State) ->
    {[<<"POST">>], Req, State}.

content_types_accepted(Req, State) ->
    {[
        {<<"application/json">>, foo}
    ], Req, State}.

foo(Req, State) ->
    {ok, Body, Req1} = cowboy_req:body(Req),
    Json = jiffy:decode(Body, [return_maps]),
    Bar = maps:get(<<"bar">>, Json),
    {ok, Res} = do_something(Bar),
    Body = jiffy:encode(#{result => Res}),
    {true, cowboy_req:set_resp_body(Body, Req1), State}.

Producer-consumer using Go channels

I needed to call a web service a specific number of times, and add the results to a file (order being unimportant). It might be possible to simply spawn a goroutine for every request, but a more elegant solution is to use the producer-consumer pattern:

func main() {
	jobs := make(chan bool)
	results := make(chan string)
	done := make(chan bool)
	numberOfWorkers:= 5
	numberOfJobs:= 1000
	for i := 0; i < numberOfWorkers; i++ {
		go worker(jobs, results, done)
	}
	go func() {
		for i := 0; i < numberOfJobs; i++ {
			jobs <- true
		}
	}()
	go func() {
		count := 0
		for {
			result := <-results
			println(result)
			count++
			if count >= numberOfJobs {
				done <- true
				return
			}
		}
	}()
	<-done
}

func worker(jobs chan bool, result chan string, done chan bool) {
	for {
		select {
		case <-jobs:
			res, err := getResult()
			if err != nil {
				panic(err)
			}
			results <- res
		case <-done:
			return
		}
	}
}

func getResult() (string, error) {
	resp, err := http.Get("http://localhost/foo")
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return 0, err
	}
	return body, nil
}

This solution uses 3 channels: one to specify the work still remaining (a bool in this case, but this channel could include data to vary the request), one to return results from the workers; and one to signify that work was complete.

This turned out to be a more interesting problem than it looked. My first attempt was using a WaitGroup, but I couldn’t get that to work. And debugging revealed an interesting gap in my mental model of how channels work: I hadn’t fully internalised the fact that pushing a message onto a go channel will block unless someone is ready to consume it. Hence the need to push work onto the jobs channel in a separate goroutine. I had it in my head that they were more like an Erlang/F# mailbox, despite the fact I knew that buffered channels were a thing.

The error handling could also be improved, currently if any request fails it just panics.

Application metadata file exists but is malformed

I was playing around with Cowboy recently. I had followed the Getting Started guide, but was hoping to avoid having to get into releases. Sadly, the easiest way to serve static files is from an application priv_dir.

I created a relx.config file, but when I ran make I got a cryptic error:

===> Starting relx build process ...
===> Resolving OTP Applications from directories:
          /vagrant/ebin
          /vagrant/deps
          /usr/local/lib/erlang/lib
===> Application metadata file exists but is malformed: /vagrant/ebin/my_app.app
===> Resolving available OTP Releases from directories:
          /vagrant/ebin
          /vagrant/deps
          /usr/local/lib/erlang/lib
Failed to solve release:
 Dependency my_app is specified as a dependency but is not reachable by the system.

I was pretty sure my app metadata file wasn’t malformed, but a quick dig through the sauce led me back to an ominous warning in the cowboy docs:

the modules line will be replaced with the list of modules during compilation; make sure to leave this line even if you do not use it directly

Adding an empty modules list was enough to build a release successfully:

{application, my_app, [
    {description, "My App"},
    ...
    {modules, []},
    ...
]}.
===> Starting relx build process ...
===> Resolving OTP Applications from directories:
          /vagrant/ebin
          /vagrant/deps
          /usr/local/lib/erlang/lib
===> Resolving available OTP Releases from directories:
          /vagrant/ebin
          /vagrant/deps
          /usr/local/lib/erlang/lib
===> Resolved my_app-1
===> Including Erts from /usr/local/lib/erlang
===> release successfully created!

Mmm, curry

Currying and partial function application are two concepts from functional programming that sound far more complicated than they really are.

One of the times I find myself wishing it were more convenient to do in JavaScript is when trying to escape callback hell by extracting functions:

var shared = new Shared();
doSomething(shared).then(function(result) {
    somethingElse(shared, result);
});

It would be nice to be able to go straight to this:

var shared = new Shared();
doSomething(shared).then(somethingElse);

Of course this wouldn’t work, as the somethingElse function expects 2 parameters instead of just one.

If JavaScript supported partial application in the way that the ML family of languages do, then we could say:

doSomething(shared).then(somethingElse(shared));

But that means something entirely different in JavaScript: the result of the function being executed with one argument would be passed in.

It’s relatively straightforward to write a curry function and use that, but there is an alternative; returning an appropriate function from the called method:

var shared = new Shared();
doSomething(shared).then(somethingElse(shared));

function somethingElse(shared, result) {
    return function(result) {
        // do something with shared & result
    };
}

Piping two commands together in Go

The Go exec package makes it pretty easy to run an OS command and read the output; and there are a few examples of how to pipe the output of one command into another (e.g. lsof | wc -l):

lsof := exec.Command("lsof")
wc := exec.Command("wc", "-l")
outPipe, err := lsof.StdoutPipe()
if err != nil {
    return nil, err
}
lsof.Start()
wc.Stdin = outPipe
out, err := wc.Output()
if err != nil {
    return nil, err
}
return out, nil

There’s a problem with this code though, the pipe will never be closed; if the caller is a long-lived process then it will leak file descriptors.

You don’t normally have to worry about this, as calling Run or Output (instead of Start) will take care of it; but we can’t use those as they’ll close the pipe too early.

The problem is easily solved, either by closing the pipe yourself:

outPipe, err := lsof.StdoutPipe()
defer outPipe.Close()

or by calling Wait after the output from the second command has been received:

out, err := wc.Output()
if err != nil {
    return nil, err
}
err = lsof.Wait()
if err != nil {
    return nil, err
}
return out, nil

Using promises with Express.js

I was looking for a way to combine promises with Express. I found a few suggestions, like this middleware, but nothing that really fit what I wanted.

Ideally, I’d like to be able to just return a promise from a handler:

app.get("/foo", function(req, res) {
    return getSomethingAsync();
});

but I couldn’t see any way to achieve that without hacking on express itself. The best I could come up with was some middleware to add a method to the response:

module.exports = function() {
    return function(req, res, next) {
        res.promise = function(promise) {
            promise.then(function(result) {
                res.send(200, result);
            }).fail(function(err) {
                if (err.statusCode) {
                    res.send(err.statusCode, { error: err.message });
                } else {
                    res.send(500, { error: 'Unexpected error' });
                }
            }).done()
        };
        next();
    };
};

which can be used like this:

app.get("/bar", function(req, res) {
    res.promise(getSomethingAsync());
});

Not the promised land

Promises are a much touted solution to callback hell in nodejs. They can certainly help to clean up your code, taking you from this:

doSomething(function(err, res) {
    if (err) {
        return handleError(err);
    }

    doSomethingElse(res, function(err, res2) {
        if (err) {
            return handleError(err);
        }

        andThen();
    });
});

to this:

doSomething().then(function(res) {
    return doSomethingElse(res);
}).then(function() {
    andThen();
}).fail(handleError);

A definite improvement! However, there is a subtle difference between using callbacks and using promises: an unhandled error in the callback code will blow up the event loop, but can be swallowed by the promises (specifically, if you don’t provide a fail handler, or call done).

I want my software to fail fast (“let it crash”), rather than limping onwards. Unfortunately, this means that you need to be very careful when using promises, to ensure that you have covered all the possible cases. And it means you need a good understanding of how they work before starting to use them; not exactly the pit of success.

So, do the readability benefits outweigh the consequences? I think so, for now, but there’s definitely room for improvement. Maybe generators will be that silver bullet :)

Acking messages with node-amqp

The documentation for node-amqp is pretty good, but I couldn’t find a good example of how to ack messages once some work has been done:

        var queueOpts = {
            durable: true,
            autoDelete: false
        };
        connection.queue('foo.bar', queueOpts, function (queue) {
            var subscriptionOpts = {
                ack: true,
                prefetchCount: 10
            };
            queue.subscribe(subscriptionOpts, function (message, headers, deliveryInfo, ack) {
                doWork(message).then(function() {
                    ack.acknowledge();
                }).fail(function(err) {
                    ack.reject(true);
                });
            });
        });

Remember, this just returns the message to the queue; so you’d probably want to track the number of attempts at processing it, and use a dead letter queue, to avoid poison messages.

Any messages not acked will be requeued when the connection is closed (you can check for them using the management plugin). Bear in mind that using acknowledgements only guarantees at least once delivery, so make sure your handlers are idempotent!

Building nginx with the push stream module on Debian

I’ve been trying out the push stream module for nginx recently. The instructions for building it are pretty clear, but I also wanted all the benefits of installing it as a package with apt-get.

I found this handy guide which outlines the general procedure. First, make sure that the build dependencies for the package are installed:

sudo apt-get build-dep nginx

Then download the package source:

apt-get source nginx

Add the new module to the source tree:

cd nginx-1.2.1/debian/modules
curl -L https://github.com/wandenberg/nginx-push-stream-module/tarball/master | sudo tar zx
mv wandenberg-nginx-push-stream-module-71c511d nginx-push-stream-module

Update the rules file to include the module, and modify the version in the changelog. Then build the package:

dpkg-buildpackage -b

You can now install the required packages:

sudo dpkg -i nginx-common_1.2.1-2.2+wheezy2-push_all.deb nginx-full_1.2.1-2.2+wheezy2-push_amd64.deb nginx_1.2.1-2.2+wheezy2-push_all.deb

Or save them, for use elsewhere. Once the module is installed you need to update the nginx.conf:

    http {
        push_stream_shared_memory_size 256M;
    }

And add a site:

server {
    listen 80;

    location /channels-stats {
        # activate channels statistics mode for this location
        push_stream_channels_statistics;

        # query string based channel id
        push_stream_channels_path               $arg_id;
    }

    location /pub {
        # activate publisher (admin) mode for this location
        push_stream_publisher admin;

        # query string based channel id
        push_stream_channels_path               $arg_id;
    }

    location ~ /sub/(.*) {
        # activate subscriber (streaming) mode for this location
        push_stream_subscriber;

        # positional channel path
        push_stream_channels_path                   $1;
    }
}

You can then test the pub/sub functionality using curl:

Subscribe:
curl -i -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Host: localhost" -H "Origin: http://localhost" http://localhost/sub/channel1

Publish:
curl -s 'http://localhost/pub?id=channel1' -d '{"foo": "bar"}'

Spin wait in node.js

When writing acceptance tests, you often need to wait for something to happen. The easy thing to do is just wait for a certain period of time before checking, but that can make your tests slower than necessary. The accepted solution is to poll for whatever it is that you’re waiting for.

C# has a handy SpinWait.SpinUntil method, and I found this node module with similar behaviour. I needed it to work with promises though, so I ended up writing my own:

var DEFAULT_TIMEOUT = 5000;

function waitUntil (predicate, done, timeout, started) {
    timeout = defaultParameter(timeout, DEFAULT_TIMEOUT);
    started = defaultParameter(started, Date.now());

    predicate().done(function(res) {
        if (res) {
            return done();
        }

        if ((started + timeout) < Date.now()) {
            throw new Error("timed out");
        }

        setTimeout(function() {
            waitUntil(predicate, done, timeout, started);
        }, 100);
    });
};

function defaultParameter (parameter, defaultValue) {
    if (typeof(parameter) === 'undefined') {
        return defaultValue;
    }
    return parameter;
};

module.exports.waitUntil = waitUntil;

The first argument needs to be a predicate function, that returns a promise; and the second will be called if the first ever succeeds. You can also change the default timeout (5s). Something like this:

SpinWait.waitUntil(function() {
    return getAllTransactions(USER_ID).then(function(data) {
        return data.rows.length > 0;
    });
}, function() {
    getAllTransactions(USER_ID).done(function(data) {
        assert.equal(1, data.rows.length);
        var transaction = data.rows[0];
        assert.equal(1000, transaction.amount);
        done();
    });
});