It’s a Syn (Part 5)

Once a netsplit has healed, we need to re-sync the registered processes. When two nodes are reconnected, we add a discover message to the inbox of both nodes:

Reconnect(n) ==
        ...
        /\ inbox' = [o \in Nodes |-> CASE
            (o = n) -> Append(inbox[o], [action |-> "discover", from |-> other_node])
            [] (o = other_node) -> Append(inbox[o], [action |-> "discover", from |-> n])
            [] OTHER -> inbox[o]
        ]

Handling that, sends a copy of the local data back to the sender:

Discover(n) ==
    /\ Len(inbox[n]) > 0
    /\ LET message == Head(inbox[n])
        IN message.action = "discover"
        /\ inbox' = [o \in Nodes |-> CASE
            (o = n) -> Tail(inbox[o])
            [] (o = message.from) -> Append(inbox[o], [action |-> "ack_sync", local_data |-> locally_registered[n]])
            [] OTHER -> inbox[o]
        ]

which is then merged:

AckSync(n) ==
    /\ Len(inbox[n]) > 0
    /\ Head(inbox[n]).action = "ack_sync"
    /\ inbox' = [inbox EXCEPT![n] = Tail(inbox[n])]
    /\ LET message == Head(inbox[n])
        IN locally_registered' = [locally_registered EXCEPT![n] = locally_registered[n] \union message.local_data]

We also need to change the invariant; to only check once all messages have been processed, on all nodes:

AllRegistered ==
    \A n \in Nodes:
        (\A o \in Nodes: Len(inbox[o]) = 0) /\ visible_nodes[n] = AllOtherNodes(n) => locally_registered[n] = registered

If we run this, we get a failure:

...

State 9: <Next line 111, col 5 to line 120, col 19 of module syn>
/\ states = << "Register",
   "Disconnect",
   "Unregister",
   "Reconnect",
   "Discover",
   "SyncRegister",
   "Discover",
   "AckSync" >>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<[action |-> "ack_sync", local_data |-> {}]>>)
/\ locally_registered = (n1 :> {0} @@ n2 :> {0})
/\ registered = {}
/\ visible_nodes = (n1 :> {n2} @@ n2 :> {n1})

State 10: <Next line 111, col 5 to line 120, col 19 of module syn>
/\ states = << "Register",
   "Disconnect",
   "Unregister",
   "Reconnect",
   "Discover",
   "SyncRegister",
   "Discover",
   "AckSync",
   "AckSync" >>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<>>)
/\ locally_registered = (n1 :> {0} @@ n2 :> {0})
/\ registered = {}
/\ visible_nodes = (n1 :> {n2} @@ n2 :> {n1})

I’ll spare you the full stack trace, but we now have a phantom key: it was removed from a node, during the partition, but then merged back in. Again, the problem lies with my model being too simplistic. We will look into how to resolve this, next time.

It’s a Syn (Part 4)

We are starting to get to the business end. So how can we model a netsplit? The easiest thing seems to be to add a set for each node, listing the nodes that are visible from it:

AllOtherNodes(n) ==
    Nodes \ {n}

Init ==
     ...
    /\ visible_nodes = [n \in Nodes |-> AllOtherNodes(n)]

At the start, all nodes can see all other nodes. Now, when we register a new value:

Register(n) ==
     ...
    /\ inbox' = [o \in Nodes |->
                       IF o \in visible_nodes[n] THEN
                           Append(inbox[o], [action |-> "sync_register", name |-> next_val])
                       ELSE
                           inbox[o]
                   ]

we only send messages to the nodes that are still visible (same for unregister). And we add two new state transitions:

Next ==
    /\ \E n \in Nodes:
        ...
        \/ Disconnect(n)
        \/ Reconnect(n)

To disconnect:

Disconnect(n) ==
    /\ Cardinality(visible_nodes[n]) > 0
    /\ LET other_node == CHOOSE o \in visible_nodes[n]: TRUE
        IN visible_nodes' = [o \in Nodes |-> CASE
            (o = other_node) -> visible_nodes[o] \ {n}
            [] (o = n) -> visible_nodes[o] \ {other_node}
            [] OTHER -> visible_nodes[o]
        ]
    /\ UNCHANGED <<registered, locally_registered, inbox, next_val>>

If other nodes are still visible; we select a node, at random, from the set and remove it (on both sides). And reconnection is the opposite:

Reconnect(n) ==
    /\ Cardinality(AllOtherNodes(n) \ visible_nodes[n]) > 0
    /\ LET other_node == CHOOSE o \in (AllOtherNodes(n) \ visible_nodes[n]): TRUE
        IN visible_nodes' = [o \in Nodes |-> CASE
            (o = other_node) -> visible_nodes[o] \union {n}
            [] (o = n) -> visible_nodes[o] \union {other_node}
            [] OTHER -> visible_nodes[o]
        ]
    /\ UNCHANGED <<registered, locally_registered, inbox, next_val>>

We also need to update our invariant:

AllRegistered ==
    \A n \in Nodes:
        Len(inbox[n]) = 0 /\ visible_nodes[n] = AllOtherNodes(n) => locally_registered[n] = registered

to only compare the registered values, once a netsplit has healed. If we run this model:

Error: Invariant AllRegistered is violated.
Error: The behavior up to this point is:
State 1: <Initial predicate>
/\ next_val = 0
/\ inbox = (n1 :> <<>> @@ n2 :> <<>>)
/\ locally_registered = (n1 :> {} @@ n2 :> {})
/\ registered = {}
/\ visible_nodes = (n1 :> {n2} @@ n2 :> {n1})

State 2: <Next line 78, col 5 to line 85, col 19 of module syn>
/\ next_val = 0
/\ inbox = (n1 :> <<>> @@ n2 :> <<>>)
/\ locally_registered = (n1 :> {} @@ n2 :> {})
/\ registered = {}
/\ visible_nodes = (n1 :> {} @@ n2 :> {})

State 3: <Next line 78, col 5 to line 85, col 19 of module syn>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<>>)
/\ locally_registered = (n1 :> {0} @@ n2 :> {})
/\ registered = {0}
/\ visible_nodes = (n1 :> {} @@ n2 :> {})

State 4: <Next line 78, col 5 to line 85, col 19 of module syn>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<>>)
/\ locally_registered = (n1 :> {0} @@ n2 :> {})
/\ registered = {0}
/\ visible_nodes = (n1 :> {n2} @@ n2 :> {n1})

it fails pretty quickly. But that is what we should expect, as new keys could have been added while the network was partitioned, and we aren’t doing anything to resolve that. And that is what we will be looking into, next time.

It’s a Syn (Part 3)

I said last time that waiting for all messages to be processed before performing any new actions was unrealistic, so let’s remove those two pre-conditions:

diff --git a/syn.tla b/syn.tla
index 5bab535..d75d5d3 100644
--- a/syn.tla
+++ b/syn.tla
@@ -15,7 +15,6 @@ Init ==
     /\ removed = 0
 
 Register(n) ==
-    /\ \A o \in Nodes: Len(inbox[o]) = 0
     /\ next_val < MaxValues
     /\ registered' = [registered EXCEPT![n] = registered[n] \union {next_val}]
     /\ next_val' = next_val + 1
@@ -34,7 +33,6 @@ ItemToRemove(n) ==
     CHOOSE r \in registered[n]: TRUE
 
 Unregister(n) ==
-    /\ \A o \in Nodes: Len(inbox[o]) = 0
     /\ Cardinality(registered[n]) > 0
     /\ LET item_to_remove == ItemToRemove(n)
         IN registered' = [registered EXCEPT![n] = registered[n] \ {item_to_remove}]

Now if we run the model checker, we have a problem:

Error: Invariant AllRegistered is violated.
Error: The behavior up to this point is:
State 1: <Initial predicate>
/\ next_val = 0
/\ inbox = (n1 :> <<>> @@ n2 :> <<>>)
/\ added = 0
/\ removed = 0
/\ registered = (n1 :> {} @@ n2 :> {})

State 2: <Next line 55, col 5 to line 60, col 19 of module syn>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<[action |-> "sync_register", name |-> 0]>>)
/\ added = 1
/\ removed = 0
/\ registered = (n1 :> {0} @@ n2 :> {})

State 3: <Next line 55, col 5 to line 60, col 19 of module syn>
/\ next_val = 1
/\ inbox = ( n1 :> <<>> @@
  n2 :>
      << [action |-> "sync_register", name |-> 0],
         [action |-> "sync_unregister", name |-> 0] >> )
/\ added = 1
/\ removed = 1
/\ registered = (n1 :> {} @@ n2 :> {})

State 4: <Next line 55, col 5 to line 60, col 19 of module syn>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<[action |-> "sync_unregister", name |-> 0]>>)
/\ added = 1
/\ removed = 1
/\ registered = (n1 :> {} @@ n2 :> {0})

State 5: <Next line 55, col 5 to line 60, col 19 of module syn>
/\ next_val = 1
/\ inbox = ( n1 :> <<[action |-> "sync_unregister", name |-> 0]>> @@
  n2 :> <<[action |-> "sync_unregister", name |-> 0]>> )
/\ added = 1
/\ removed = 2
/\ registered = (n1 :> {} @@ n2 :> {})

State 6: <Next line 55, col 5 to line 60, col 19 of module syn>
/\ next_val = 1
/\ inbox = (n1 :> <<>> @@ n2 :> <<[action |-> "sync_unregister", name |-> 0]>>)
/\ added = 1
/\ removed = 2
/\ registered = (n1 :> {} @@ n2 :> {})

In this scenario, a key (0) was added on node 1 and then immediately removed. But node 2 still had both messages waiting, so it also added the key; and then decided to remove it itself before getting to the unregister message from node 1. This is getting more interesting, but the problem lies with our model, not the protocol.

It’s generally considered bad form to update the registry for another process (and this can actually be enforced); but the real problem here is that even though removing an item that doesn’t exist from a set is a no-op, we are double counting. So we think that we have added 1 item, and removed 2; and therefore the size of the set should be -1, which is never going to happen.

Assuming we want to allow the non-strict behaviour, we could probably solve this with some clever conditional updates of the counters; but I think I would prefer instead to track a “platonic ideal” of what should be registered, and compare the actual state to that instead.

Init ==
    ...
    /\ registered = {}
    /\ locally_registered = [n \in Nodes |-> {}]

Register(n) ==
    ...
    /\ registered' = registered \union {next_val}
    /\ locally_registered' = [locally_registered EXCEPT![n] = locally_registered[n] \union {next_val}]

AllRegistered ==
    \A n \in Nodes:
        Len(inbox[n]) = 0 => locally_registered[n] = registered

This does remove the previous error, but even with just 5 keys, we are generating far more states than before:

Model checking completed. No error has been found.
    ...
1782413 states generated, 357188 distinct states found, 0 states left on queue.
The depth of the complete state graph search is 26.

This means running the model will be slower, but should also mean it’s more likely to find some interesting concurrent executions.

Next stop, netsplits!

It’s a Syn (Part 2)

To make things more interesting, we need to take the simple model, and run it with multiple concurrent nodes:

CONSTANTS
Nodes = {n1, n2}

Just two, to start with. The model state needs to become more complex:

Init ==
    /\ inbox = [n \in Nodes |-> <<>>]
    /\ registered = [n \in Nodes |-> {}]
    /\ next_val = 0
    /\ added = 0
    /\ removed = 0

Each node now has its own set of registered values; and, this being an Erlang system, it makes sense to model the inbox for messages (a sequence, for each node). In this case, a node is really a process (or gen_server).

Next ==
    /\ \E n \in Nodes:
        \/ Register(n)
        \/ SyncRegister(n)
        \/ Unregister(n)
        \/ SyncUnregister(n)
        \/ Complete

There are more options, for the next state: we now pick a node from the set of all nodes, and apply an action to that. Also, register & unregister have split in two (async).

Register(n) ==
    /\ \A o \in Nodes: Len(inbox[o]) = 0
    /\ next_val < MaxValues
    /\ registered' = [registered EXCEPT![n] = registered[n] \union {next_val}]
    /\ next_val' = next_val + 1
    /\ added' = added + 1
    /\ inbox' = [o \in Nodes |-> IF o = n THEN inbox[o] ELSE Append(inbox[o], [action |-> "sync_register", name |-> next_val])]
    /\ UNCHANGED <<removed>>

First we update the local registry, then broadcast a message to all other nodes, to do the same.

SyncRegister(n) ==
    /\ Len(inbox[n]) > 0
    /\ Head(inbox[n]).action = "sync_register"
    /\ registered' = [registered EXCEPT![n] = registered[n] \union {Head(inbox[n]).name}]
    /\ inbox' = [inbox EXCEPT![n] = Tail(inbox[n])]
    /\ UNCHANGED <<removed, added, next_val>>

To make things simpler, I decided that a pre-condition for register would be that all nodes have empty inboxes (i.e. any previous messages have already been processed). This obviously isn’t realistic, and the constraint will be removed later.

ItemToRemove(n) ==
    CHOOSE r \in registered[n]: TRUE

Unregister(n) ==
    /\ \A o \in Nodes: Len(inbox[o]) = 0
    /\ Cardinality(registered[n]) > 0
    /\ LET item_to_remove == ItemToRemove(n)
        IN registered' = [registered EXCEPT![n] = registered[n] \ {item_to_remove}]
        /\ inbox' = [o \in Nodes |-> IF o = n THEN inbox[o] ELSE Append(inbox[o], [action |-> "sync_unregister", name |-> item_to_remove])]
    /\ removed' = removed + 1
    /\ UNCHANGED <<added, next_val>>

SyncUnregister(n) ==
    /\ Len(inbox[n]) > 0
    /\ Head(inbox[n]).action = "sync_unregister"
    /\ registered' = [registered EXCEPT![n] = registered[n] \ {Head(inbox[n]).name}]
    /\ inbox' = [inbox EXCEPT![n] = Tail(inbox[n])]
    /\ UNCHANGED <<removed, added, next_val>>

Unsurprisingly, unregister is still a mirror image of the same steps. We maintain the same invariant:

AllRegistered ==
    \A n \in Nodes:
        Len(inbox[n]) = 0 => Cardinality(registered[n]) = (added - removed)

but evaluated on each node, individually; and we don’t bother checking until all messages have been processed (as the counters obviously won’t match).

And now we also check that all messages have been consumed, when the system halts:

PROPERTIES
AllMessagesProcessed

AllMessagesProcessed ==
    \A n \in Nodes:
        <>(Len(inbox[n]) = 0)

Again, we can run this with the model checker:

Starting...
Implied-temporal checking--satisfiability problem has 2 branches.
Computing initial states...
Finished computing initial states: 1 distinct state generated at 2024-01-05 13:15:50.
Progress(41) at 2024-01-05 13:15:50: 543 states generated, 286 distinct states found, 0 states left on queue.
Checking 2 branches of temporal properties for the complete state space with 572 total distinct states at (2024-01-05 13:15:50)
Finished checking temporal properties in 00s at 2024-01-05 13:15:50
Model checking completed. No error has been found.
  Estimates of the probability that TLC did not check all reachable states
  because two distinct states had the same fingerprint:
  calculated (optimistic):  val = 4.0E-15
543 states generated, 286 distinct states found, 0 states left on queue.
The depth of the complete state graph search is 41.

And the number of possible states has ballooned from the simple model, but still no errors!

We can also try registering a larger number of keys, or using more nodes, with the same result; but the search space, and runtime, can increase dramatically (this is the downside to exhaustive checking, vs example based testing or even generative random testing).

It’s a Syn (Part 1)

I was looking at the Syn process registry recently, and thought it might be interesting to try modelling its netsplit recovery protocol (eventual consistency) in TLA+.

It’s always worth starting with something so simple, that it can’t be wrong. In this case I decided that scopes were out of scope, as was metadata; and I would first try to build a model for a single node.

Init ==
    /\ registered = {}
    /\ next_val = 0
    /\ added = 0
    /\ removed = 0

We have an empty set, for the registered values. A serial for the next item to be added (using integers as “keys”); and two counters, for the number of items added & removed.

Next ==
    \/ Register
    \/ Unregister
    \/ Complete

The next state is a choice of registering a value, unregistering a value; or reaching the end (an arbitrary limit).

Register ==
    /\ next_val < 5
    /\ registered' = registered \union {next_val}
    /\ next_val' = next_val + 1
    /\ added' = added + 1
    /\ UNCHANGED <<removed>>

If we haven’t hit that limit yet (pre-condition), then we add the next value to the set of registered values, and increment the counter.

ItemToRemove ==
    CHOOSE r \in registered: TRUE

Unregister ==
    /\ Cardinality(registered) > 0
    /\ registered' = registered \ {ItemToRemove}
    /\ removed' = removed + 1
    /\ UNCHANGED <<added, next_val>>

Unregistration is pretty similar. The pre-condition this time is that the set is not empty, and obviously we remove the (randomly selected) key instead.

Complete ==
    /\ next_val = 5
    /\ UNCHANGED <<added, next_val, registered, removed>>

Finally, once 5 values have been added (no matter how many were removed), we give up.

INVARIANTS
AllRegistered

AllRegistered == Cardinality(registered) = (added - removed)

And after every step, we ensure that the number of items in the set is equal to those added minus those removed.

If we run the model:

$ docker run --rm -it -v $PWD:/app -w /app openjdk:14-jdk-alpine java -XX:+UseParallelGC -cp tla2tools.jar tlc2.TLC -config syn.cfg -workers auto -cleanup syn.tla
TLC2 Version 2.18 of Day Month 20?? (rev: cab6f13)
...
Computing initial states...
Finished computing initial states: 1 distinct state generated at 2024-01-05 12:23:27.
Model checking completed. No error has been found.
  Estimates of the probability that TLC did not check all reachable states
  because two distinct states had the same fingerprint:
  calculated (optimistic):  val = 1.8E-17
37 states generated, 21 distinct states found, 0 states left on queue.
The depth of the complete state graph search is 11.

No errors are found. Not a huge surprise, but this gives us a good foundation to build something more realistic.

CQRS Bookings (Part 2)

Last time, we looked at taking a booking. Now we want to persist that, over a server restart. We are going to do something very simple, using DETS; obviously for a more realistic scenario you might want to store your data in an RDBMS, or Kafka, or even a distributed ledger (Raft/Paxos).

First, we need to open the file, during init:

{ok, _Name} = dets:open_file(bookings, []),

We can then save the command, after validation:

handle_call({book_room, Cmd}, _From, #{available_rooms:=AvailableRooms, version:=Version} = State) ->
     ...
    NewVersion = Version + 1,
    ok = dets:insert(bookings, {NewVersion, Cmd}),
    ...

Now, if the process dies, and is respawned; we can resurrect the state, by replaying all the saved commands:

init(unused) ->
     ...
    MatchSpec = ets:fun2ms(fun({N,Cmd}) when N >= 0 -> {N, Cmd} end),
    ExistingBookings = lists:sort(fun({A,_}, {B,_}) -> A =< B end, dets:select(bookings, MatchSpec)),
    NewBookings = lists:foldl(fun({_, Cmd}, B) -> add_new_booking(Cmd, B) end, maps:get(bookings, State), ExistingBookings),

This isn’t exactly “event sourcing“, but something more akin to the Write Ahead Log (WAL) that databases use. We could just load all the history (which is basically what is happening right now), but the match spec:

[{{'$1','$2'},[{'>','$1',{const,0}}],[{{'$1','$2'}}]}]

will come in handy. We can now stop & start the server, and as long as the file on disk remains, any history will survive. To make things more efficient, we can save a snapshot of the state, at regular intervals; and only replay any events newer than that:

init(unused) ->
    {ok, _} = dets:open_file(snapshot, []),
    Results = dets:lookup(snapshot, latest),
    State = case length(Results) of
        0 ->
            new_state();
        1 ->
            [{latest, S}] = Results,
            S
    end,
    LatestVersion = maps:get(version, State),
    MatchSpec = ets:fun2ms(fun({N,Cmd}) when N > LatestVersion -> {N, Cmd} end),
    ...
    erlang:send_after(60 * 1000, self(), save_snapshot),
    ...

handle_info(save_snapshot, State) ->
    ok = dets:insert(snapshot, {latest, State}),
    {noreply, State};

This isn’t without risk. The main benefit of the “let it crash” philosophy is that any bad state is blown away, so we would need to take into account that we could attempt to replay a poison message; particularly if e.g. the actual hotel availability was from an external system, so any action might not be idempotent. We might need to discard some commands, or even perform a compensating action.

CQRS Bookings (Part 1)

I recently came across the CQRS Bookings kata, and thought it might be a good chance to experiment. The actual kata description isn’t very detailed, but there is a good example repo (in C#).

You first need some data structure to define what rooms are available & when (in a more realistic example, this would come from some external source, e.g. a third party API). I decided to start with something relatively simple: a map of hotel id to date, then to a list of available rooms (another map).

3 => #{
   {2023, 12, 1} => [
                     #{
                       id => <<"101">>,
                       prices => #{
                                   "EUR" => #{
                                              1 => 109,
                                              2 => 140
                                             }
                                  }
                      },
                     #{
                       id => <<"102">>,
                       prices => #{
                                   "EUR" => #{
                                              1 => 109,
                                              2 => 140
                                             }
                                  }
                      },
                      ...

The rooms have two prices, depending on the number of occupants.

We can then have a first stab at making a booking:

gen_server:call(cqrs_booking, {book_room, {1, 2, <<"101">>, {2023, 12, 1}, {2023, 12, 2}}}).

handle_call({book_room, Cmd}, _From, #{available_rooms:=AvailableRooms, bookings:=Bookings} = State) ->
    {Client, Hotel, Room, CheckIn, _CheckOut} = Cmd,
    AvailableRoomsForHotel = maps:get(Hotel, AvailableRooms),
    AvailableRoomsForDay = maps:get(CheckIn, AvailableRoomsForHotel),
    [_RoomInfo] = lists:filter(fun(R) -> maps:get(id, R) == Room end, AvailableRoomsForDay),
    NewBookings = case maps:is_key(Client, Bookings) of
        true ->
            BookingsForClient = maps:get(Client, Bookings),
            maps:update(Client, [Cmd | BookingsForClient], Bookings);
        false ->
            maps:put(Client, [Cmd], Bookings)
    end,
    {reply, ok, State#{bookings:=NewBookings}};

First, we look up the hotel by id, then the day (using the check-in date). This would crash the process, if the key did not exist. If the room is available, then the booking is added to another map (part of the gen_server state). I’m not removing the room from the available list, which would obviously be a problem if you were actually running a hotel; but that’s not really the part I’m interested in.

To make things a bit more CQRS, we can now split the validation of the command, from the actual processing. This obviously introduces a whole new assortment of trade-offs, but deciding whether that is worthwhile for a particular use case is out of the scope of this blog post.

self() ! {new_booking, Cmd},

This is simply sending a new message to the same process, to be handled later:

handle_info({new_booking, {Client, _Hotel, _Room, _CheckIn, _CheckOut} = Cmd}, #{bookings:=Bookings} = State) ->
    NewBookings = case maps:is_key(Client, Bookings) of
        true ->
            BookingsForClient = maps:get(Client, Bookings),
            maps:update(Client, [Cmd | BookingsForClient], Bookings);
        false ->
            maps:put(Client, [Cmd], Bookings)
    end,
    {noreply, State#{bookings:=NewBookings}};

This works; but as of v21, there is a more elegant way to handle it:

{reply, ok, State, {continue, {new_booking, Cmd}}};

...

handle_continue({new_booking, {Client, _Hotel, _Room, _CheckIn, _CheckOut} = Cmd}, #{bookings:=Bookings} = State) ->
    ...

All well and good, so far; but if the server crashes, we lose everything. Next time, we’ll look at persisting some data.

Putting an ECS Service behind an ALB, using Ansible

Buzzword bingo. If you want to set up an ECS Service (on Fargate), fronted by an ALB (and an API Gateway, and a WAF, and CloudFront – to tick all the boxes), there’s an ansible task for you.

Assuming you already have a task definition, creating a service is pretty straightforward:

- name: Create service
  ecs_service:
    name: ...
    cluster: ...
    task_definition: ...
    network_configuration:
      assign_public_ip: no
      security_groups: ...
      subnets: [...]
    launch_type: FARGATE
    desired_count: ...
    load_balancers: ...

The important bit is the `load_balancers` section. The ansible docs aren’t very informative on this point:

But that just means you need to switch to the AWS docs. You need a list, containing a dict per LB:

    load_balancers:
      -
        targetGroupArn: arn:aws:elasticloadbalancing:region:accountId:targetgroup/serviceName/id
        containerName: ...
        containerPort: 8080

With the arn of the target group that the tasks should be added to/removed from, the name of the container (in case your task definition contains more than one), and the port that traffic should be forwarded to.

Replicating all tables, logically

pglogical is an extremely useful extension, allowing forms of replication not supported by WAL shipping; but the downside is that you have to select which tables are included.

If you are using it to perform a major version upgrade, for example, the answer is EVERYTHING (just about).

You can write a script, listing the tables; but once you’ve done that a few times, you start to think about automating it, to ensure that any new/updated tables are included correctly.

It’s relatively straightforward to get a list of all tables from PG:

SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pglogical', 'public', 'pg_catalog', ...)
    AND NOT (table_schema = '...' AND (table_name = '...' OR table_name = '...' OR table_name= '...'))
    AND NOT (table_schema = '...' AND table_name = '...')
ORDER BY 1, 2;

Excluding anything that causes problems (e.g. ref data added by migrations). This gives you a pipe separated list:

    table_schema    |          table_name          
--------------------+------------------------------
 foo              | bar

That you can plonk into a google sheet (other cloud based applications are available):

And finally, you need a horrifying line of VBA:

=CONCATENATE("SELECT pglogical.replication_set_add_table('", IF(ISBLANK(C2), "default", C2), "', '", A2, ".", B2, "', true, null, ", IF(ISBLANK(D2),"null", CONCATENATE("'", D2, "'")), ");")

Glorious!

(You don’t need all the escaped quotes, if your tables/schemas have sensible names; and you can ignore col D, if you aren’t filtering any data)

And then something similar, for sequences:

SELECT sequence_schema, sequence_name
FROM information_schema.sequences
WHERE sequence_schema NOT IN ('public', ...)
ORDER BY 1, 2;

And:

=CONCATENATE("SELECT pglogical.replication_set_add_sequence('default', '", A2, ".", B2, "');")

Terraforming an RDS Proxy

We have been using pgbouncer as a connection pooler, and have been very happy with it; but some recent load testing has shown that it will need to grow with throughput. And it is also a single point of failure, unless you build some sort of load balanced cluster.

While that is possible, RDS Proxy offers the dream of autoscaling & resilience delivered to your door. We looked into it, when first released, and you had to use IAM authentication; but now native auth is an option.

I was following the getting started guide, but I wanted to use tf rather than the cli.

First, I needed a security group, allowing access to the pg port:

resource "aws_security_group" "rds_proxy_sg" {
    name = "rds-proxy-sg"
    vpc_id = var.vpc_id

    ingress {
        cidr_blocks = [ 
            var.aws_cidr
        ]   
        from_port = 5432
        to_port = 5432
        protocol = "tcp"
    }
    egress {
        from_port = 0 
        to_port  = 0 
        protocol = "-1"
        cidr_blocks = [ "0.0.0.0/0" ]
    }
}

And a role/policy to allow the proxy to get creds from secrets manager:

resource "aws_iam_role" "rds_proxy_role" {
  name = "RdsProxySecrets"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "rds.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

resource "aws_iam_policy" "rds_proxy_policy" {
  name        = "RdsProxySecrets"
  path        = "/"

  policy = <<EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "secretsmanager:GetSecretValue",
            "Resource": [
                "arn:aws:secretsmanager:eu-west-2:***:secret:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "kms:Decrypt",
            "Resource": "arn:aws:kms:eu-west-2:****:key/***",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "secretsmanager.eu-west-2.amazonaws.com"
                }
            }
        }
    ]
}
EOF
}

resource "aws_iam_policy_attachment" "rds_proxy_policy_attachment" {
  name       = "RdsProxySecrets"
  roles      = [aws_iam_role.rds_proxy_role.name]
  policy_arn = aws_iam_policy.rds_proxy_policy.arn
}

I don’t have any other secrets in there, so I used a wildcard instead of listing them all individually.

Then the proxy itself:

resource "aws_db_proxy" "rds_proxy" {
    name                   = "rds-proxy"
    debug_logging          = false
    engine_family          = "POSTGRESQL"
    idle_client_timeout    = 1800
    require_tls            = true 
    role_arn               = aws_iam_role.rds_proxy_role.arn
    vpc_security_group_ids = [aws_security_group.rds_proxy_sg.id]
    vpc_subnet_ids         = var.private_subnets
    ...

which needs the role arn, the sg id, and the subnets (output from a dependency, in my case).

And you also need the auth block, for each login:

    auth {
        username = "foo"
        secret_arn = "arn:aws:secretsmanager:eu-west-2:***:secret:foo"
        iam_auth   = "DISABLED"
        client_password_auth_type = "POSTGRES_SCRAM_SHA_256"
    }

Unfortunately, when I ran this, I got an error:

An error occurred (InvalidParameterValue) when calling the CreateDBProxy operation: UserName must not be set in UserAuthConfig when SECRETS AuthScheme is used.

Given that “SECRETS” is the only auth scheme available, this was a bit confusing; but the username is included in the secret json (more on this later), so I guess it is not needed. With that property removed, the proxy could be created.

You also need a target, i.e. which db to call:

resource "aws_db_proxy_target" "rds_proxy_target" {
    db_instance_identifier = var.rds_instance
    db_proxy_name          = aws_db_proxy.rds_proxy.name
    target_group_name      = "default"
}

I am relying on the “default” target group being named that, rather than creating a new one.

At this point, I thought I was golden, but when I tried to connect:

$ psql -h rds-proxy.proxy-***.eu-west-2.rds.amazonaws.com -d foo -U foo
psql: error: FATAL:  This RDS proxy has no credentials for the role foo. Check the credentials for this role and try again.

I was disappoint.

I turned on “enhanced logging”, and had a look in cloudwatch:

Credentials couldn't be retrieved. The AWS Secrets Manager secret with the ARN "arn:aws:secretsmanager:eu-west-2:***:secret:foo" has an incorrect secret format or has empty username. Format the secret like this: 
{
    "username": "",
    "password": ""
}

I could see the creds in Secrets Manager, and it looked like the correct format:

{'username': 'foo', 'password': '...'}

but clearly something was wrong:

I had created the secrets using ansible, because the db passwords were in group vars (this creates a chicken & egg problem with the tf, but I’ll ignore that for now):

- name: Create creds in secrets manager
  local_action:
    module: community.aws.secretsmanager_secret
    name: "creds-{{ item }}"
    state: present
    secret_type: 'string'
    secret: >
      {"username":"{{ item }}","password":"{{ hostvars[groups['pgbouncer'][0]][item | regex_replace('-', '_') + '_db_password'] }}"}
  loop: "{{ db_accounts }}"

but it seems that I needed to use json_secret instead.

$ psql -h rds-proxy.proxy-***.eu-west-2.rds.amazonaws.com -d foo -U foo
Password for user foo: 
psql (13.11 (Debian 13.11-0+deb11u1), server 13.10)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.

foo=> 

Success! 🌈

Now I need to throw some locusts at it, and see if this was worth the effort.