Returning errors when piping a stream

Most of the examples of piping a stream of data using expressjs look like this:

app.get('/video', function(req, res) {
    var cmd = "ffmpeg";
    var args = [...];
    var proc = spawn(cmd, args);
    res.contentType('video/mp4');
    proc.stdout.pipe(res);
});

Which is great for the happy path, but means any errors from the child proc are returned as a 200; and, in my case, cached. Not ideal.

I googled it pretty hard, and even asked on SO, with no joy. Eventually, I found this article, at which point I realised I’d been asking the wrong question!

The pipe method is part of the base library, nothing to do with express (obvious, in retrospect). And, as the documentation clearly states, it calls end when the readable stream ends.

So, the solution is to do that bit yourself:

proc.stdout.pipe(res, {end: false});
proc.on("error", err => {
    console.log("error from ffmpeg", err.stack);
    res.status(500).end();
}); 
proc.on("exit", code => {
    console.log("child proc exited", code);
    res.status(code === 200 ? 200 : 500).end();
});

Boom! (Just remember to handle all the cases, so end is always called).

Streaming video to iOS devices

It seems that neither iOS devices, nor Safari on OS X, support mp4; so if you’re trying to stream video, you need to provide another format.

The recommendation is to use HLS, which fortunately is supported by ffmpeg, you merely need to adjust your incantation:

app.get('/hls/video', function(req, res) {
    res.contentType('application/vnd.apple.mpegurl');
    var proc = ffmpeg();
    proc.stdout.pipe(res);
});
 
function ffmpeg() {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-vcodec", "libx264",
        "-f", "hls",
        "-hls_time", "9",
        "-hls_list_size", "0",
        "-profile:v", "baseline",
        "-level", "3.0",
        "pipe:1"
    );
    return spawn(cmd, args);
}

I could probably use conneg to decide which format to return, rather than the uri, but I’m not convinced that my caching infrastructure (varnish and cloudfront now!) would handle that correctly.

Streaming video from ffmpeg

ffmpeg is a fantastic tool for converting, concatenating, or otherwise fiddling with video content. If you can generate what you need in advance, then you can upload it to s3 (or some other CDN like option); but sometimes you need to stream video on demand.

First, you need to get the ffmpeg incantation right:

ffmpeg -i video1.mp4 -i video2.mp4 ... \
       -filter_complex "something something" \
       -movflags "frag_keyframe+empty_moov"
       -f mp4 pipe:1

Using pipe:1 sends the output to stdout (as documented here), and the movflags are needed to allow streaming in mp4 format.

With this in place, it’s easy to pipe the output data to a browser using expressjs:

#!/usr/bin/env node
"use strict";

const express = require('express');
const { spawn } = require('child_process');

var app = express();

app.get('/video', function(req, res) {
    res.contentType('video/mp4');
    var proc = ffmpeg();
    proc.stdout.pipe(res);

    res.on("close", () => {
        proc.kill("SIGKILL");
    });
});

app.listen(4000);

function ffmpeg() {
    var cmd = "ffmpeg";
    var filter = "some complex filter expr";
    var args = ["-i", "video1.mp4"];
    ...
    args.push(
        "-filter_complex", filter,
        "-s", "1280x720",
        "-acodec", "aac",
        "-vcodec", "h264",
        "-movflags", "frag_keyframe+empty_moov",
        "-f", "mp4",
        "pipe:1"
    );
    return spawn(cmd, args);
}

I found I ended up with “zombie” ffmpeg processes, if the client connection had closed before the video ended (because it’s still trying to write data to the pipe?). There’s probably a neater way to solve that, but kill -9 is pretty effective!

At this point, you can stream some video; but this solution won’t scale, every request runs the ffmpeg command, which is pretty cpu intensive. It would be relatively simple to cache the output, either in memory or on disk; or you could put the node process behind a caching proxy.

However, the best solution for our needs seemed to be using this as an “origin server” for Cloudfront. This means that only the first request (of each type) hits our server, and the response is cached for as long as necessary.

If you need to render different videos for every request, then you’ll just have to throw money at it!

SSH keys in Docker

One of the first stumbling blocks, when using Docker, is often discovering that your SSH keys are on the wrong side of the hatchway.

A simple solution is just to copy them, when building the image:

COPY ~/.ssh /root/.ssh

And this is reasonable, if the image is never going to leave your laptop. Any respectable tinfoil hat wearing neckbeard will look for an alternative though.

An obvious next step is to mount the same folder, as a volume:

SSH=~/.ssh
docker run -it --rm -v ${SSH}:/root/.ssh app npm install

Unfortunately, you’ll probably find at this point that the permissions are not correct inside the container, and that if you try and change them, they are hopelessly entangled with the permissions outside the container.

After much googling, and trying ideas from StackOverflow, the best solution I found was to use the same ssh-agent as the host:

SSH=~/.ssh
docker run -it --rm -v ${SSH}:/root/.ssh -v $SSH_AUTH_SOCK:/ssh-agent -e SSH_AUTH_SOCK=/ssh-agent app npm install

(I’m still mounting the .ssh folder, but only to recycle my known_hosts file).

Or, using compose:

version: '2' 
services:
  app:
    build: .
    volumes:
      - .:/app
      - ~/.ssh:/root/.ssh
      - $SSH_AUTH_SOCK:/ssh-agent

Getting a rax image id

If you use the Rackspace API to create servers (e.g. via the ansible module), you need to know the id of the image you want to use.

It’s pretty easy to make the list request using curl, but the json is quite noisy and it’s paged; so I wrote a script:

#!/usr/bin/env python2

# https://developer.rackspace.com/docs/cloud-images/quickstart/

import os
import json
import urllib2
from urlparse import urlparse

username = os.environ['RAX_USERNAME']
api_key = os.environ['RAX_API_KEY']
region = os.environ['RAX_REGION']

url = 'https://identity.api.rackspacecloud.com/v2.0/tokens'
headers = {
    'Content-Type': 'application/json'
}
req_body = {
    'auth': {
        'RAX-KSKEY:apiKeyCredentials': {
            'username': username,
            'apiKey': api_key
        }
    }
}

data = json.dumps(req_body)
req = urllib2.Request(url, data, headers)
response = urllib2.urlopen(req)
res_body = json.loads(response.read())
access = res_body['access']
token = access['token']['id']
#print "Token: {0}".format(token)
endpoints = [s for s in access['serviceCatalog'] if s['name'] == 'cloudImages'][0]['endpoints']
endpoint = [e for e in endpoints if e['region'] == region][0]['publicURL']
#print "Endpoint: {0}".format(endpoint)
url = '{0}/images'.format(endpoint)
#print "Url: {0}".format(url)
images = []

while url != None :
    headers = {
        'X-Auth-Token': token
    }
    req = urllib2.Request(url, headers=headers)
    response = urllib2.urlopen(req)
    res_body = json.loads(response.read())
    images = images + map(lambda i: {'id': i['id'], 'name': i['name']}, res_body['images'])

    if 'next' in res_body :
        u = urlparse(req.get_full_url())
        next_page = '{0}://{1}{2}'.format(u.scheme, u.netloc, res_body['next'])
        #print "Next: {0}".format(next_page)
        url = next_page
    else :
        url = None

for i in images:
    print "{0}: {1}".format(i['name'], i['id'])

UPDATE: the rax module allows you to use a name now (e.g. “Debian 8 (Jessie) (PVHVM)”), but this can still be useful for listing other rax “things”

Game of Life in Elm (Pt 4)

In the last part we started a timer, to update the game; now we need implement the “business logic”.

To check that everything is working, we can start with a simple function that inverts the state of the board on every tick:

nextGeneration : Board -> Board
nextGeneration board =
    Array.indexedMap (\i r -> (nextGenRow i r board)) board

nextGenRow : Int -> Row -> Board -> Row
nextGenRow rowIndex row board =
    Array.indexedMap (\i c -> (nextGenCell i rowIndex c board)) row

nextGenCell : Int -> Int -> Cell -> Board -> Cell
nextGenCell cellIndex rowIndex cell board =
    case cell of
        Alive -> Dead
        Dead -> Alive

Satisfied that the board updates as expected, we can move onto implementing the real rules. Everything is conditional on the current state of the cell, and the number of live neighbours it has:

nextGenCell : Cell -> Int -> Cell
nextGenCell cell liveNeighbours =
    case cell of
        Alive ->
            if liveNeighbours < 2 then
               Dead
            else if liveNeighbours > 3 then
               Dead
            else
               Alive
        Dead ->
            if liveNeighbours == 3 then
                Alive
            else
                Dead

We can count the neighbours for each cell:

liveNeighbours : Int -> Int -> Board -> Int
liveNeighbours rowIndex colIndex board =
    liveNeighboursAbove rowIndex colIndex board +
    liveNeighboursAdjacent rowIndex colIndex board +
    liveNeighboursBelow rowIndex colIndex board

liveNeighboursAbove : Int -> Int -> Board -> Int
liveNeighboursAbove rowIndex colIndex board =
    isAlive (rowIndex - 1) (colIndex - 1) board +
    isAlive (rowIndex - 1) (colIndex) board +
    isAlive (rowIndex - 1) (colIndex + 1) board

liveNeighboursAdjacent : Int -> Int -> Board -> Int
liveNeighboursAdjacent rowIndex colIndex board =
    isAlive rowIndex (colIndex - 1) board +
    isAlive rowIndex (colIndex + 1) board

liveNeighboursBelow : Int -> Int -> Board -> Int
liveNeighboursBelow rowIndex colIndex board =
    isAlive (rowIndex + 1) (colIndex - 1) board +
    isAlive (rowIndex + 1) (colIndex) board +
    isAlive (rowIndex + 1) (colIndex + 1) board

isAlive : Int -> Int -> Board -> Int
isAlive rowIndex colIndex board =
    case (getCell colIndex (getRow rowIndex board)) of
        Alive -> 1
        Dead -> 0

If the row or column index is 0, we need to wrap the grid around from top to bottom. We could do this by checking the index, but as the Array getters return a Maybe we can just handle the case that it doesn’t exist:

getRow : Int -> Board -> Row
getRow i board =
    case Array.get i board of
        Just row -> row
        Nothing ->
            case Array.get ((Array.length board) - 1) board of
                Just row -> row
                Nothing -> Debug.crash "oops"

getCell : Int -> Row -> Cell
getCell i row =
    case Array.get i row of
        Just cell -> cell
        Nothing ->
            case Array.get ((Array.length row) - 1) row of
                Just cell -> cell
                Nothing -> Debug.crash "oops"

The Elm philosophy leans towards covering all edge cases, which is generally a Good Thing, but can become annoying e.g. when parsing json.

Now we can put all the pieces together with a map:

nextGeneration : Board -> Board
nextGeneration board =
    Array.indexedMap (\i r -> (nextGenRow i r board)) board

nextGenRow : Int -> Row -> Board -> Row
nextGenRow rowIndex row board =
    Array.indexedMap (\i c -> (nextGenCell c (liveNeighbours rowIndex i board))) row

And we finally have a working Game of Life!

Game of Life in Elm (Pt 3)

In Part 2 we seeded the board; next we need to start playing the game. We want to update on a timer, say once a second. This means we need to add a subscription to our program:

main =
    Html.program {
        init = init,
        view = view,
        update = update,
        subscriptions = subscriptions
    }

subscriptions : Model -> Sub Msg
subscriptions model =
      Time.every Time.second Tick

and handle those messages in our update function:

type Msg =
    NewBoard Board |
    Tick Time.Time

update : Msg -> Model -> (Model, Cmd Msg)
update msg model =
    case msg of
        Tick t ->
            (nextGeneration model, Cmd.none)
        NewBoard board ->
            (board, Cmd.none)

nextGeneration : Board -> Board
nextGeneration board =
    board

The heart of the game will be the nextGeneration function, which given the current state of the board should return the next version after following the rules of the game.

At this point, I realised my life would probably be easier if I used Arrays for the model, rather than Lists. These are very different to what most people think of as an array, as they are immutable and implemented as “relaxed radix balanced-trees“; but they have similar properties to the other sort of arrays.

type alias Row = Array.Array Cell
type alias Board = Array.Array Row

init =
    (Array.empty, Random.generate NewBoard (seedBoard))

Now we need to convert the random Lists of cells to Arrays:

update msg model =
    case msg of
        ...
        NewBoard cells ->
            (toBoard cells, Cmd.none)

toBoard : (List (List Cell)) -> Board
toBoard cells =
    Array.fromList (List.map toRow cells)

toRow : (List Cell) -> Row
toRow cells =
    Array.fromList cells

In the fourth, and final part, we will implement the nextGeneration function.