Add newlines to Firehose transform lambda

If you want to use a lambda to transform data being sent to a Firehose, the recommended “blueprint” looks something like this:

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    return {'records': output}

which works perfectly, unless you want to later query the records stored in S3 using Athena. Then you discover that because the serialized json is not newline separated, you’re up shit creek.

The secret is to add the newline, after processing:

r = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(json.dumps(data).encode() + b'\n').decode("utf-8")
    }

Flattening json arrays, with Athena

If you have an S3 bucket, chock full of newline separated json blobs (e.g. from a firehose ingest pipeline); you may want to query said files, using a SQL-like prompt. This is the promise of Athena.

If your json blobs are very similar, you can specify the schema when creating the table; and parsing is taken care of, for you. The rest of the time, you are in the land of “semi-structured data”.

CREATE EXTERNAL TABLE `foo_202510`(
  `id` bigint COMMENT 'from deserializer',
  `j` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'case.insensitive'='TRUE', 
  'dots.in.keys'='FALSE', 
  'ignore.malformed.json'='FALSE', 
  'mapping'='TRUE') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://some-bucket/2025/10/'
TBLPROPERTIES (
  'classification'='json', 
  'transient_lastDdlTime'=...)

The json files in the S3 bucket are organised by YYYY/MM/DD, which allows me to partition the data, and only scan what I am interested in (as it is many GBs). Assuming each json blob looks like this:

{ "id": 124232, "j": { ... } }

I am loading the id into one col, and the nested object (as a string) into another. I can then run a query, against that table:

SELECT
    id,
    json_extract(j, '$.abc')
FROM "foo_202510"
LIMIT 1

A more complicated scenario, is that the nested json contains an array – that I wish to flatten, so I have a row in the output per item in the array:

WITH data AS (
    SELECT
        id,
        cast(json_extract(j, '$.a') as array(json)) arr
    FROM "foo_202510"
    WHERE ...
)
SELECT
    id,
    json_extract(a, '$.b') b
FROM data
CROSS JOIN UNNEST(arr) AS t(a)

We start with a CTE, extracting the array – and casting it to the correct type. We can then perform a cross join on this temp table, to get a row per array item.

Using SSM with Ansible

In the before time, to access EC2 instances on a private subnet, you would need to have SSH running on a bastion/jump host in a public subnet to tunnel through.

A more modern alternative is to install the SSM Agent (or use an OS that comes with it already set up). You can then use the AWS console, or CLI, to create a session – tunneling through AWS infrastructure, and using their authentication. This allows you to use short lived credentials, and provides an audit trail – without needing to ship ssh logs to somewhere secure.

$ AWS_PROFILE=... aws ssm start-session --target 'i-123...'
$

For ansible to achieve the same trick, you need to update your inventory:

plugin: aws_ec2
regions: ...
...
compose:
  ansible_host: instance_id
  ansible_connection: '"amazon.aws.aws_ssm"'
  ansible_aws_ssm_bucket_name: '"..."'

You still use the same plugin, but need to add a few more details. The ansible_host needs to be the EC2 instance id (rather than an IP address), and you obviously no longer need the ssh ProxyCommand. The connection type is SSM (and the nested quotes are needed), and finally you need an S3 bucket (this is used for ansible to up/download the python scripts to run on the target node – which would previously have been SCPed, I think).

You should now be able to list the available inventory, as before:

AWS_REGION=... AWS_PROFILE=... ansible-inventory -i inventory.aws_ec2.yml --graph

Once this is working, the transition is pretty seamless. The only real downside is that running a playbook is noticeably slower (~2x) 🐌

Batch closing PRs

If you find yourself in the situation where a golem has opened many many PRs, and you need to close them; you can use the web UI, but only 25 at a time.

A better option is to use the GH CLI. Once you’ve done the auth dance, you can list the PRs for a repo, and just return the number (not the id, which is a different thing):

$ gh pr list --json number -L 1
[
  {
    "number": 123
  }
]

You can then add a jq filter, to extract the number from the json blob:

$ gh pr list --json number -q '.[] | .number' -L 1
123

And finally, send each line to a close cmd, using xargs:

$ gh pr list --json number -q '.[] | .number' -L 500 | xargs -I % sh -c 'gh pr close %'
✓ Closed pull request foo/bar#123 (...)
...

Cross-region pull for a lambda function

Having got x-acct pull working for my lambda function, in staging, I had foolishly assumed that running the same CDK script for prod would be easy (as that is the same account where the ECR lives).

Instead the build was failing, with a confusing message:

14:52:31  MyStack |  4/11 | 1:52:29 PM | CREATE_FAILED        | AWS::Lambda::Function                       | MyLambda (MyLambda...) Resource handler returned message: "Source image ACCOUNT_ID.dkr.ecr.REGION.amazonaws.com/REPO:latest is not valid. Provide a valid source image. (Service: Lambda, Status Code: 400, Request ID: ...) (SDK Attempt Count: 1)" (RequestToken: ..., HandlerErrorCode: InvalidRequest)

Obviously, the ECR uri is valid, or it wouldn’t be working in the other account. I assumed it was permissions related, but the permissions I had added for x-acct seemed to be a superset of the permissions necessary within the same account.

When I tried to create the lambda in the console, a slightly more useful error was returned. It seems that Lambda is unable to pull from ECR in another region (even though Fargate has no trouble). The easiest solution to this, is to enable replication.

You can do this in the cloudformation to create the repo:

Resources:
    RepositoryReplicationConfig:
        Type: AWS::ECR::ReplicationConfiguration
        Properties:
            ReplicationConfiguration: 
                Rules:
                    - Destinations:
                        - Region: ...
                          RegistryId: ... (account id)

Cross-account pull for a Lambda function

I have been trying to set up a Lambda function, using the CDK; that uses a docker image, from a different account (because reasons). It felt like I was stuck in a chicken & egg situation, where the IAM role to be used was created by the stack, which then failed (because it couldn’t pull the image) and rolled back; deleting the role.

I tried using a wildcard, for the principal:

                Statement:
                -
                    Sid: AllowCrossAccountPull
                    Effect: Allow
                    Principal:
                        AWS: "arn:aws:iam::$ACCOUNT_ID:role/$STACK-LambdaServiceRole*"

but that was rejected:

Resource handler returned message: "Invalid parameter at 'PolicyText' failed to satisfy constraint: 'Principal not found'

After some digging around in the CDK source code, I was able to create the role first; and set up the cross account permissions before creating the lambda. But I was still getting the same error:

Resource handler returned message: "Lambda does not have permission to access the ECR image. Check the ECR permissions. (Service: Lambda, Status Code: 403, ...

At this point, I did what I should have done originally, and actually read the docs. It turns out that like Fargate tasks use a separate role to start the task, and execute it, so does Lambda. But in this case, one role is played by the service itself.

After a bit more flopping around, I finally had something that worked 🥳

                Statement:
                -
                    Sid: CrossAccountPermission
                    Effect: Allow
                    Principal:
                        AWS: "arn:aws:iam::$ACCOUNT_ID:root"
                    Action:
                        - "ecr:BatchGetImage"
                        - "ecr:GetDownloadUrlForLayer"
                -
                    Sid: LambdaECRImageCrossAccountRetrievalPolicy
                    Effect: Allow
                    Principal:
                        Service: "lambda.amazonaws.com"
                    Action:
                        - "ecr:BatchGetImage"
                        - "ecr:GetDownloadUrlForLayer"

API Gateway & Origin IP

I was in the process of adding an API Gateway, between Cloudfront and an existing ALB; when we discovered that valid requests were being rejected.

After some digging, we realised that API Gateway was using the more modern Forwarded header, rather than the usual XFF headers. It’s relatively straightforward to parse the format, or find a library that will:

const parse = require('forwarded-parse');

module.exports = function() {
    return function(req, res, next) {
        if (req.headers.forwarded) {
            const forwarded = parse(req.headers.forwarded).map(f => f.for);
            Object.defineProperties(req, {
                'ip': {
                    configurable: true,
                    enumerable: true,
                    get() {
                        return forwarded[0];
                    },
                },
                'ips': {
                    configurable: true,
                    enumerable: true,
                    get() {
                        return forwarded;
                    },
                },
            });
        }

        next();
    };
};

With this middleware plugged in, the routes with IP filtering were behaving correctly again. However.. if you want to run the same app in an env with just an ALB, e.g. as you roll out the API Gateway, then you have a problem.

Because the ALB does not know about that header, it will quite happily let you pass a spoofed version through. And there doesn’t seem to be an easy way to detect that you are behind an API Gateway (there is a Via header, but the ALB doesn’t overwrite that either).

curl "https://..." -H 'content-type: application/json' -d '{...}' -H 'forwarded: by=...;for=...,for=...;host=...;proto=https' -H 'via: HTTP/1.1 AmazonAPIGateway'

In the end, we were forced to use an env var (“feature flag”), to only enable that middleware in envs where we knew it was safe.

Fetch tags when cloning repo

I recently moved a freestyle Jenkins job to (multibranch) pipeline, and discovered that there was a subtle change in behaviour in the checkout being made. Before:

git fetch --tags --force --progress -- git@github.com:*** +refs/heads/*:refs/remotes/origin/* # timeout=10

And after:

git fetch --no-tags --force --progress -- https://github.com/*** +refs/heads/master:refs/remotes/origin/master # timeout=10

The important difference here being the --no-tags. This is probably a sensible default for most use cases (i.e. a faster clone), but this job was using semantic-release; which needs the tags.

It’s relatively simple to fix this in the ui, you can add an “advanced clone behaviours” block, and tick a box:

But this job is created using the Job DSL, and I couldn’t see any easy way to add that to the branchSource. In the end, I changed the checkout step from:

stage('Checkout') {
            steps {
                checkout scm
            }
}

to:

checkout scmGit(
                    branches: scm.branches,
                    extensions: [cloneOption(noTags: false, reference: '', shallow: false)],
                    userRemoteConfigs: scm.userRemoteConfigs
                )

It’s a Syn (Part 13)

It took a few attempts, but I managed to come up with a merge implementation that worked for both AckSync and SyncRegister, with more than 2 nodes:

Merge(left, right) ==
    LET to_keep == {name \in DOMAIN left : (name \notin DOMAIN right \/ left[name] > right[name])}
    IN [name \in to_keep |-> left[name]]

RECURSIVE Flatten(_, _, _)

Flatten(keys, struct, acc) ==
    IF keys = {} THEN acc
    ELSE
        LET k == CHOOSE k \in keys: TRUE
        IN Flatten(keys \ {k}, struct, acc @@ struct[k])

MergeRegistries(local, remote, remote_node) ==
    LET all_registered == Flatten(DOMAIN local, local, << >>)
    IN [r \in DOMAIN local |-> CASE
        (r = remote_node) -> local[r] @@ Merge(remote, all_registered)
        [] OTHER -> Merge(local[r], remote)
    ]

SyncRegister(n) ==
    ...
    l == MergeRegistries(locally_registered[n], [r \in {message.name} |-> message.time], message.from)

AckSync(n) ==
    ...
    l == MergeRegistries(locally_registered[n], message.local_data, message.from)

and now I can run simulation node (with 5 nodes & more keys), for hours; without failure. I have no idea how much of the search space has been covered though, I suspect not a lot.

And on that note, I think I’m done with this topic 🎉

It’s a Syn (Part 12)

Up till now, we have been using the default behaviour of TLC: model checking mode. But there is another tool in our quiver (mixing my metaphors there), simulation mode!

This is more like the way property testing tooling works: rather than a breadth-first search of the problem space, a random walk is generated. As we have discovered, the time taken to run an exhaustive model check rapidly expands, as the model becomes more complicated.

If you have added constraints to prevent the model running forever (e.g. in our case, the disconnection limit, and not re-using registered names), you can remove those now. When you run the spec:

$ docker run --rm -it -v $PWD:/app -w /app amazoncorretto:8-alpine3.17-jdk java -cp tla2tools.jar tlc2.TLC -simulate syn.tla
TLC2 Version 2.18 of 20 March 2023 (rev: 3ea3222)
Running Random Simulation with seed -7079380368092791143 with 1 worker on 8 cores with 3488MB heap and 64MB offheap memory (Linux 6.7.2-arch1-1 amd64, Amazon.com Inc. 1.8.0_392 x86_64).
...

it gives you the seed, so you can run the same tests again after fixing any issues. TLC will run forever in this mode, if there aren’t any failures, but we are not so lucky:

Error: Invariant ThereCanBeOnlyOne is violated.
Error: The behavior up to this point is:

...

State 34: <Next line 290, col 5 to line 303, col 22 of module syn>
/\ disconnections = 6
/\ locally_registered = ( n1 :> (n1 :> (a :> 2 @@ b :> 26) @@ n2 :> <<>> @@ n3 :> <<>>) @@
  n2 :> (n1 :> <<>> @@ n2 :> (a :> 22) @@ n3 :> <<>>) @@
  n3 :> (n1 :> (a :> 2) @@ n2 :> (a :> 22) @@ n3 :> << >>) )
/\ time = 33
/\ states = << <<"Register", n1, a>>,
   <<"Register", n2, a>>,
   <<"RegisterOrUpdateOnNode", n1, a>>,
   <<"Disconnect", n2, n1>>,
   <<"Register", n1, b>>,
   <<"Disconnect", n2, n3>>,
   <<"Register", n2, b>>,
   <<"SyncRegister", n3, a>>,
   <<"Register", n1, c>>,
   <<"Reconnect", n3, n2>>,
   <<"Unregister", n1, a>>,
   <<"Register", n2, c>>,
   <<"Disconnect", n2, n3>>,
   <<"Down", n3, n2>>,
   <<"Reconnect", n3, n2>>,
   <<"Register", n1, d>>,
   <<"Unregister", n1, a>>,
   <<"Unregister", n1, a>>,
   <<"Reconnect", n1, n2>>,
   <<"Disconnect", n2, n1>>,
   <<"Reconnect", n2, n1>>,
   <<"Disconnect", n2, n1>>,
   <<"RegisterOrUpdateOnNode", n2, a>>,
   <<"Register", n1, e>>,
   <<"Down", n1, n2>>,
   <<"Discover", n3, n2>>,
   <<"RegisterOrUpdateOnNode", n1, b>>,
   <<"Down", n3, n2>>,
   <<"Register", n1, f>>,
   <<"Disconnect", n1, n3>>,
   <<"Register", n3, b>>,
   <<"Discover", n3, n2>>,
   <<"SyncRegister", n3, a>> >>
/\ visible_nodes = (n1 :> {} @@ n2 :> {n3} @@ n3 :> {n2})
/\ registered = (a :> 2 @@ b :> 1 @@ c :> 0 @@ d :> 0 @@ e :> 0 @@ f :> 0 @@ g :> 0 @@ h :> 0)
/\ inbox = (n1 :> <<[action |-> "register_or_update_on_node", name |-> c], [action |-> "unregister_on_node", name |-> a], [action |-> "register_or_update_on_node", name |-> d], [action |-> "unregister_on_node", name |-> a], [action |-> "unregister_on_node", name |-> a], [action |-> "discover", from |-> n2], [action |-> "DOWN", from |-> n2], [action |-> "discover", from |-> n2], [action |-> "DOWN", from |-> n2], [action |-> "register_or_update_on_node", name |-> e], [action |-> "register_or_update_on_node", name |-> f], [action |-> "DOWN", from |-> n3]>> @@ n2 :> <<[time |-> 2, action |-> "sync_register", name |-> a, from |-> n1], [action |-> "DOWN", from |-> n1], [action |-> "DOWN", from |-> n3], [action |-> "register_or_update_on_node", name |-> b], [action |-> "discover", from |-> n3], [action |-> "register_or_update_on_node", name |-> c], [action |-> "DOWN", from |-> n3], [action |-> "discover", from |-> n3], [action |-> "discover", from |-> n1], [action |-> "DOWN", from |-> n1], [action |-> "discover", from |-> n1], [action |-> "DOWN", from |-> n1], [action |-> "ack_sync", from |-> n3, local_data |-> << >>], [action |-> "ack_sync", from |-> n3, local_data |-> << >>]>> @@ n3 :> <<[time |-> 26, action |-> "sync_register", name |-> b, from |-> n1], [action |-> "DOWN", from |-> n1], [action |-> "register_or_update_on_node", name |-> b]>>)
/\ names = (n1 :> {a, g, h} @@ n2 :> {d, e, f, g, h} @@ n3 :> {a, c, d, e, f, g, h})

The stack trace is much deeper now (34 transitions), and with 3 nodes it is much harder to understand what has gone wrong. This looks like an issue with my conflict resolution handling though, I am only checking for conflicts between the current node, and the sender. This works fine with a two node cluster, but not with 3 (or more).

I could probably change that check to \in RegisteredOnThisNode(n), but I would need to know which node had registered it, to resolve the conflict; and whatever change I make will also need to be reproduced in AckSync too. It might be time to bite the bullet, and factor out the conflict resolution (yet more recursion).