Buffered channels in Erlang

Golang has the handy concept of a buffered channel (a similar concept to .NET’s blocking collections). It allows you to throttle your code, by blocking when adding an item to a collection that is already full.

Erlang doesn’t offer anything similar, out of the box (mailboxes will happily increase in size until the VM topples over), but it’s pretty easy to implement using a gen_server. You only need to handle 2 messages, adding an item:

init([Limit]) ->
    State = #{
        limit => Limit,
        count => 0,
        add_q => queue:new(),
        get_q => queue:new()
    },
    {ok, State}.

handle_call(add, From, State = #{count:=Count, limit:=Limit, add_q:=AddQ, get_q:=GetQ}) ->
    UpdatedCount = Count + 1,
    case UpdatedCount > Limit of
        false ->
            case queue:is_empty(GetQ) of
                true ->
                    {reply, ok, State#{count => UpdatedCount}};
                _ ->
                    {{value, Client}, UpdatedQ} = queue:out(GetQ),
                    gen_server:reply(Client, ok),
                    {reply, ok, State#{get_q => UpdatedQ}}
            end;
        _ ->
            {noreply, State#{add_q => queue:in(From, AddQ)}}
    end;

The process is initialised with a limit, the number of items that can be added to the queue before it will block. This collection is FIFO, but another data structure, e.g. a stack, could be used.

When an add message is received, the current size of the collection is checked. If adding another item would breach the limit, then the pid of the caller is added to the wait queue. Otherwise, the number of items is incremented (this version only keeps a count of items, rather than an actual list, but adding that would be trivial).

If any process is blocked waiting to get an item from the collection, then it is unblocked and an item removed immediately. We also need to handle a get message:

handle_call(get, From, State = #{count:=Count, name:=_Name, add_q:=AddQ, get_q:=GetQ}) ->
    case Count > 0 of
        true ->
            case queue:is_empty(AddQ) of
                true ->
                    UpdatedCount = Count - 1,
                    {reply, ok, State#{count => UpdatedCount}};
                _ ->
                    {Client, UpdatedQ} = queue:out(AddQ),
                    gen_server:reply(Client, ok),
                    {reply, ok, State#{add_q => UpdatedQ}}
            end;
        _ ->
            {noreply, State#{get_q => queue:in(From, GetQ)}}
    end;

which is pretty much the opposite, check if the collection is empty. Block if it is, or return an element to the caller.

Once your collection is up & running (and supervised!), using it is as simple as calling any other gen_server:

ok = gen_server:call(queue_1, get, infinity),
do_something(),
ok = gen_server:call(queue_2, add, infinity),

I’ve set the timeout to infinity, but that might not be a good idea for production code.

It would also be pretty simple to back the collection with disk storage, e.g. using a DETS table.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s