Add newlines to Firehose transform lambda

If you want to use a lambda to transform data being sent to a Firehose, the recommended “blueprint” looks something like this:

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    return {'records': output}

which works perfectly, unless you want to later query the records stored in S3 using Athena. Then you discover that because the serialized json is not newline separated, you’re up shit creek.

The secret is to add the newline, after processing:

r = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(json.dumps(data).encode() + b'\n').decode("utf-8")
    }

Flattening json arrays, with Athena

If you have an S3 bucket, chock full of newline separated json blobs (e.g. from a firehose ingest pipeline); you may want to query said files, using a SQL-like prompt. This is the promise of Athena.

If your json blobs are very similar, you can specify the schema when creating the table; and parsing is taken care of, for you. The rest of the time, you are in the land of “semi-structured data”.

CREATE EXTERNAL TABLE `foo_202510`(
  `id` bigint COMMENT 'from deserializer',
  `j` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'case.insensitive'='TRUE', 
  'dots.in.keys'='FALSE', 
  'ignore.malformed.json'='FALSE', 
  'mapping'='TRUE') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://some-bucket/2025/10/'
TBLPROPERTIES (
  'classification'='json', 
  'transient_lastDdlTime'=...)

The json files in the S3 bucket are organised by YYYY/MM/DD, which allows me to partition the data, and only scan what I am interested in (as it is many GBs). Assuming each json blob looks like this:

{ "id": 124232, "j": { ... } }

I am loading the id into one col, and the nested object (as a string) into another. I can then run a query, against that table:

SELECT
    id,
    json_extract(j, '$.abc')
FROM "foo_202510"
LIMIT 1

A more complicated scenario, is that the nested json contains an array – that I wish to flatten, so I have a row in the output per item in the array:

WITH data AS (
    SELECT
        id,
        cast(json_extract(j, '$.a') as array(json)) arr
    FROM "foo_202510"
    WHERE ...
)
SELECT
    id,
    json_extract(a, '$.b') b
FROM data
CROSS JOIN UNNEST(arr) AS t(a)

We start with a CTE, extracting the array – and casting it to the correct type. We can then perform a cross join on this temp table, to get a row per array item.

Using SSM with Ansible

In the before time, to access EC2 instances on a private subnet, you would need to have SSH running on a bastion/jump host in a public subnet to tunnel through.

A more modern alternative is to install the SSM Agent (or use an OS that comes with it already set up). You can then use the AWS console, or CLI, to create a session – tunneling through AWS infrastructure, and using their authentication. This allows you to use short lived credentials, and provides an audit trail – without needing to ship ssh logs to somewhere secure.

$ AWS_PROFILE=... aws ssm start-session --target 'i-123...'
$

For ansible to achieve the same trick, you need to update your inventory:

plugin: aws_ec2
regions: ...
...
compose:
  ansible_host: instance_id
  ansible_connection: '"amazon.aws.aws_ssm"'
  ansible_aws_ssm_bucket_name: '"..."'

You still use the same plugin, but need to add a few more details. The ansible_host needs to be the EC2 instance id (rather than an IP address), and you obviously no longer need the ssh ProxyCommand. The connection type is SSM (and the nested quotes are needed), and finally you need an S3 bucket (this is used for ansible to up/download the python scripts to run on the target node – which would previously have been SCPed, I think).

You should now be able to list the available inventory, as before:

AWS_REGION=... AWS_PROFILE=... ansible-inventory -i inventory.aws_ec2.yml --graph

Once this is working, the transition is pretty seamless. The only real downside is that running a playbook is noticeably slower (~2x) 🐌

Cross-region pull for a lambda function

Having got x-acct pull working for my lambda function, in staging, I had foolishly assumed that running the same CDK script for prod would be easy (as that is the same account where the ECR lives).

Instead the build was failing, with a confusing message:

14:52:31  MyStack |  4/11 | 1:52:29 PM | CREATE_FAILED        | AWS::Lambda::Function                       | MyLambda (MyLambda...) Resource handler returned message: "Source image ACCOUNT_ID.dkr.ecr.REGION.amazonaws.com/REPO:latest is not valid. Provide a valid source image. (Service: Lambda, Status Code: 400, Request ID: ...) (SDK Attempt Count: 1)" (RequestToken: ..., HandlerErrorCode: InvalidRequest)

Obviously, the ECR uri is valid, or it wouldn’t be working in the other account. I assumed it was permissions related, but the permissions I had added for x-acct seemed to be a superset of the permissions necessary within the same account.

When I tried to create the lambda in the console, a slightly more useful error was returned. It seems that Lambda is unable to pull from ECR in another region (even though Fargate has no trouble). The easiest solution to this, is to enable replication.

You can do this in the cloudformation to create the repo:

Resources:
    RepositoryReplicationConfig:
        Type: AWS::ECR::ReplicationConfiguration
        Properties:
            ReplicationConfiguration: 
                Rules:
                    - Destinations:
                        - Region: ...
                          RegistryId: ... (account id)

Cross-account pull for a Lambda function

I have been trying to set up a Lambda function, using the CDK; that uses a docker image, from a different account (because reasons). It felt like I was stuck in a chicken & egg situation, where the IAM role to be used was created by the stack, which then failed (because it couldn’t pull the image) and rolled back; deleting the role.

I tried using a wildcard, for the principal:

                Statement:
                -
                    Sid: AllowCrossAccountPull
                    Effect: Allow
                    Principal:
                        AWS: "arn:aws:iam::$ACCOUNT_ID:role/$STACK-LambdaServiceRole*"

but that was rejected:

Resource handler returned message: "Invalid parameter at 'PolicyText' failed to satisfy constraint: 'Principal not found'

After some digging around in the CDK source code, I was able to create the role first; and set up the cross account permissions before creating the lambda. But I was still getting the same error:

Resource handler returned message: "Lambda does not have permission to access the ECR image. Check the ECR permissions. (Service: Lambda, Status Code: 403, ...

At this point, I did what I should have done originally, and actually read the docs. It turns out that like Fargate tasks use a separate role to start the task, and execute it, so does Lambda. But in this case, one role is played by the service itself.

After a bit more flopping around, I finally had something that worked 🥳

                Statement:
                -
                    Sid: CrossAccountPermission
                    Effect: Allow
                    Principal:
                        AWS: "arn:aws:iam::$ACCOUNT_ID:root"
                    Action:
                        - "ecr:BatchGetImage"
                        - "ecr:GetDownloadUrlForLayer"
                -
                    Sid: LambdaECRImageCrossAccountRetrievalPolicy
                    Effect: Allow
                    Principal:
                        Service: "lambda.amazonaws.com"
                    Action:
                        - "ecr:BatchGetImage"
                        - "ecr:GetDownloadUrlForLayer"

API Gateway & Origin IP

I was in the process of adding an API Gateway, between Cloudfront and an existing ALB; when we discovered that valid requests were being rejected.

After some digging, we realised that API Gateway was using the more modern Forwarded header, rather than the usual XFF headers. It’s relatively straightforward to parse the format, or find a library that will:

const parse = require('forwarded-parse');

module.exports = function() {
    return function(req, res, next) {
        if (req.headers.forwarded) {
            const forwarded = parse(req.headers.forwarded).map(f => f.for);
            Object.defineProperties(req, {
                'ip': {
                    configurable: true,
                    enumerable: true,
                    get() {
                        return forwarded[0];
                    },
                },
                'ips': {
                    configurable: true,
                    enumerable: true,
                    get() {
                        return forwarded;
                    },
                },
            });
        }

        next();
    };
};

With this middleware plugged in, the routes with IP filtering were behaving correctly again. However.. if you want to run the same app in an env with just an ALB, e.g. as you roll out the API Gateway, then you have a problem.

Because the ALB does not know about that header, it will quite happily let you pass a spoofed version through. And there doesn’t seem to be an easy way to detect that you are behind an API Gateway (there is a Via header, but the ALB doesn’t overwrite that either).

curl "https://..." -H 'content-type: application/json' -d '{...}' -H 'forwarded: by=...;for=...,for=...;host=...;proto=https' -H 'via: HTTP/1.1 AmazonAPIGateway'

In the end, we were forced to use an env var (“feature flag”), to only enable that middleware in envs where we knew it was safe.

Putting an ECS Service behind an ALB, using Ansible

Buzzword bingo. If you want to set up an ECS Service (on Fargate), fronted by an ALB (and an API Gateway, and a WAF, and CloudFront – to tick all the boxes), there’s an ansible task for you.

Assuming you already have a task definition, creating a service is pretty straightforward:

- name: Create service
  ecs_service:
    name: ...
    cluster: ...
    task_definition: ...
    network_configuration:
      assign_public_ip: no
      security_groups: ...
      subnets: [...]
    launch_type: FARGATE
    desired_count: ...
    load_balancers: ...

The important bit is the `load_balancers` section. The ansible docs aren’t very informative on this point:

But that just means you need to switch to the AWS docs. You need a list, containing a dict per LB:

    load_balancers:
      -
        targetGroupArn: arn:aws:elasticloadbalancing:region:accountId:targetgroup/serviceName/id
        containerName: ...
        containerPort: 8080

With the arn of the target group that the tasks should be added to/removed from, the name of the container (in case your task definition contains more than one), and the port that traffic should be forwarded to.

Terraforming an RDS Proxy

We have been using pgbouncer as a connection pooler, and have been very happy with it; but some recent load testing has shown that it will need to grow with throughput. And it is also a single point of failure, unless you build some sort of load balanced cluster.

While that is possible, RDS Proxy offers the dream of autoscaling & resilience delivered to your door. We looked into it, when first released, and you had to use IAM authentication; but now native auth is an option.

I was following the getting started guide, but I wanted to use tf rather than the cli.

First, I needed a security group, allowing access to the pg port:

resource "aws_security_group" "rds_proxy_sg" {
    name = "rds-proxy-sg"
    vpc_id = var.vpc_id

    ingress {
        cidr_blocks = [ 
            var.aws_cidr
        ]   
        from_port = 5432
        to_port = 5432
        protocol = "tcp"
    }
    egress {
        from_port = 0 
        to_port  = 0 
        protocol = "-1"
        cidr_blocks = [ "0.0.0.0/0" ]
    }
}

And a role/policy to allow the proxy to get creds from secrets manager:

resource "aws_iam_role" "rds_proxy_role" {
  name = "RdsProxySecrets"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "rds.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

resource "aws_iam_policy" "rds_proxy_policy" {
  name        = "RdsProxySecrets"
  path        = "/"

  policy = <<EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "secretsmanager:GetSecretValue",
            "Resource": [
                "arn:aws:secretsmanager:eu-west-2:***:secret:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "kms:Decrypt",
            "Resource": "arn:aws:kms:eu-west-2:****:key/***",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "secretsmanager.eu-west-2.amazonaws.com"
                }
            }
        }
    ]
}
EOF
}

resource "aws_iam_policy_attachment" "rds_proxy_policy_attachment" {
  name       = "RdsProxySecrets"
  roles      = [aws_iam_role.rds_proxy_role.name]
  policy_arn = aws_iam_policy.rds_proxy_policy.arn
}

I don’t have any other secrets in there, so I used a wildcard instead of listing them all individually.

Then the proxy itself:

resource "aws_db_proxy" "rds_proxy" {
    name                   = "rds-proxy"
    debug_logging          = false
    engine_family          = "POSTGRESQL"
    idle_client_timeout    = 1800
    require_tls            = true 
    role_arn               = aws_iam_role.rds_proxy_role.arn
    vpc_security_group_ids = [aws_security_group.rds_proxy_sg.id]
    vpc_subnet_ids         = var.private_subnets
    ...

which needs the role arn, the sg id, and the subnets (output from a dependency, in my case).

And you also need the auth block, for each login:

    auth {
        username = "foo"
        secret_arn = "arn:aws:secretsmanager:eu-west-2:***:secret:foo"
        iam_auth   = "DISABLED"
        client_password_auth_type = "POSTGRES_SCRAM_SHA_256"
    }

Unfortunately, when I ran this, I got an error:

An error occurred (InvalidParameterValue) when calling the CreateDBProxy operation: UserName must not be set in UserAuthConfig when SECRETS AuthScheme is used.

Given that “SECRETS” is the only auth scheme available, this was a bit confusing; but the username is included in the secret json (more on this later), so I guess it is not needed. With that property removed, the proxy could be created.

You also need a target, i.e. which db to call:

resource "aws_db_proxy_target" "rds_proxy_target" {
    db_instance_identifier = var.rds_instance
    db_proxy_name          = aws_db_proxy.rds_proxy.name
    target_group_name      = "default"
}

I am relying on the “default” target group being named that, rather than creating a new one.

At this point, I thought I was golden, but when I tried to connect:

$ psql -h rds-proxy.proxy-***.eu-west-2.rds.amazonaws.com -d foo -U foo
psql: error: FATAL:  This RDS proxy has no credentials for the role foo. Check the credentials for this role and try again.

I was disappoint.

I turned on “enhanced logging”, and had a look in cloudwatch:

Credentials couldn't be retrieved. The AWS Secrets Manager secret with the ARN "arn:aws:secretsmanager:eu-west-2:***:secret:foo" has an incorrect secret format or has empty username. Format the secret like this: 
{
    "username": "",
    "password": ""
}

I could see the creds in Secrets Manager, and it looked like the correct format:

{'username': 'foo', 'password': '...'}

but clearly something was wrong:

I had created the secrets using ansible, because the db passwords were in group vars (this creates a chicken & egg problem with the tf, but I’ll ignore that for now):

- name: Create creds in secrets manager
  local_action:
    module: community.aws.secretsmanager_secret
    name: "creds-{{ item }}"
    state: present
    secret_type: 'string'
    secret: >
      {"username":"{{ item }}","password":"{{ hostvars[groups['pgbouncer'][0]][item | regex_replace('-', '_') + '_db_password'] }}"}
  loop: "{{ db_accounts }}"

but it seems that I needed to use json_secret instead.

$ psql -h rds-proxy.proxy-***.eu-west-2.rds.amazonaws.com -d foo -U foo
Password for user foo: 
psql (13.11 (Debian 13.11-0+deb11u1), server 13.10)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.

foo=> 

Success! 🌈

Now I need to throw some locusts at it, and see if this was worth the effort.

Converting an AWS Lambda to an ECS Task

I have a python script (removing duplicates from a Redshift database); that currently runs as a Lambda, triggered by EventBridge. Unfortunately, it has been known to take longer than 15 mins, the max timeout for a lambda function.

The alternative seems to be running it as an ECS Task (on Fargate). I was uploading the lambda function as a zip; but it’s pretty easy to convert that to a Docker image, and push to ECR:

FROM python:3

COPY requirements.txt  .
RUN  pip3 install -r requirements.txt

COPY app.py .

CMD [ "python", "-c", "import app; app.foo(None, None)" ]

I took the easy way out, and just set the CMD to the existing lambda entrypoint. You then need an ECS cluster (or you may be able to use the default cluster):

docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli ecs create-cluster --cluster-name foo

And an IAM role to execute the task:

...  iam create-role --role-name FooExecution --assume-role-policy-document file://aws/ecs/TrustPolicy.json

With a trust policy for ECS to assume the role:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Action": "sts:AssumeRole",
        "Principal": {
            "Service": "ecs-tasks.amazonaws.com"
        },
        "Effect": "Allow"
    }]
}

And permissions to pull the image from ECR:

... iam put-role-policy --role-name FooExecution --policy-name ECR --policy-document file://aws/iam/ECR.json
{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "ecr:BatchCheckLayerAvailability",
            "ecr:BatchGetImage",
            "ecr:GetDownloadUrlForLayer"
        ],
        "Resource": [
            "arn:aws:ecr:$region:$account:repository/$repo"
        ]
    }, {
        "Effect": "Allow",
        "Action": [
            "ecr:GetAuthorizationToken"
        ],
        "Resource": [
            "*"
        ]
    }]
}

If you want logs in CloudWatch, you also need a policy allowing that:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "*"
        ]
    }]
}

That’s enough to run the task, but if you need to use any AWS API in the task (e.g. boto3), then you need another role:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "redshift:GetClusterCredentials"
        ],
        "Resource": [
            "arn:aws:redshift:$region:$account:dbuser:$cluster/$user",
            "arn:aws:redshift:$region:$account:dbname:$cluster/$db"
        ]
    }]
}

If you want logs, you need a log group:

... logs create-log-group --log-group-name /aws/fargate/foo
... logs put-retention-policy --log-group-name /aws/fargate/foo --retention-in-days 7

Phooooo… nearly there. Now you need a task definition:

... ecs register-task-definition --family foo --cpu 256 --memory 512 --network-mode awsvpc --requires-compatibilities FARGATE --execution-role-arn arn:aws:iam::$account:role/$executionRole --task-role-arn arn:aws:iam::$account:role/$taskRole --container-definitions "[{\"name\":\"foo\",\"image\":\"$image:latest\",\"logConfiguration\":{\"logDriver\":\"awslogs\",\"options\":{\"awslogs-region\":\"$region\",\"awslogs-group\":\"/aws/fargate/foo\",\"awslogs-stream-prefix\":\"foo\"}}}]"

And you should be in a position to test that the task can run, without the cron trigger:

... ecs run-task --launch-type FARGATE --cluster foo --task-definition foo:1 --network-configuration "awsvpcConfiguration={subnets=['foo',...],securityGroups=['sg-...'],assignPublicIp='ENABLED'}" --count 1

You need a public IP to pull the ECR image (unless you want to jump through some hoops).

If that went well, you can proceed to set up the cron trigger (EventBridge):

... events put-rule --name foo --schedule-expression 'cron(0 4 * * ? *)'

You need yet another role, to use the execution role:

...iam create-role --role-name FooEvents --assume-role-policy-document file://aws/events/TrustPolicy.json
...iam put-role-policy --role-name FooEvents --policy-name Ecs --policy-document file://aws/iam/Ecs.json

This time the trust policy needs to be for EventBridge:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Action": "sts:AssumeRole",
        "Principal": {
            "Service": "events.amazonaws.com"
        },
        "Effect": "Allow"
    }]
}

And the permissions for the target:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "iam:PassRole"
        ],
        "Resource": [
            "arn:aws:iam::$account:role/FooExecution",
            "arn:aws:iam::$account:role/FooTask"
        ]
    }, {
        "Effect": "Allow",
        "Action": [
            "ecs:RunTask"
        ],
        "Resource": "*",
        "Condition": {
            "ArnEquals": {
                "ecs:cluster": "arn:aws:ecs:$region:$account:cluster/foo"
            }
        }
    }]
}

And finally, you need a target for the rule:

...events put-targets --rule foo --targets file://aws/events/Targets.json
[{
    "Id": "1",
    "Arn": "arn:aws:ecs:$region:$account:cluster/foo",
    "RoleArn": "arn:aws:iam::$account:role/FooEvents",
    "EcsParameters": {
        "TaskDefinitionArn": "arn:aws:ecs:$region:$account:task-definition/foo",
        "LaunchType": "FARGATE",
        "NetworkConfiguration": {
            "awsvpcConfiguration": {
                "Subnets": ["subnet-***",...],
                "SecurityGroups": ["sg-***"],
                "AssignPublicIp": "ENABLED"
            }
        }
    }
}]

I included all 3 subnets from the default VPC, and the default SG.

That was a lot! But hopefully you now have a working cron job, that can take as long as it wants to complete.

Only upload changed files to (a different) s3 bucket

We have a PR build that uploads the generated (html) output to a public s3 bucket, so you can check the results before merging. This is useful, but the output has grown over time, and is now ~6GB; so the job takes a long time to run, and uploads a lot of unnecessary files.

I recently switched the trunk build to use sync from the AWS CLI (rather than s3cmd), which was noticeably faster; so I thought I’d try using the --dry-run feature, to generate a diff against the production bucket.

docker run --rm -v $PWD:/app -w /app -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY amazon/aws-cli s3 sync output/ s3://foo-prod --dryrun --size-only

Unfortunately, there’s no machine readable output options for that command, so we need to get our awk on. My first attempt was to generate a cp command, for each line:

docker run ... | awk '{sub(/output\//, ""); sub(/&/, "\\\\&"); print "docker run --rm -v $PWD:/app -w /app -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY amazon/aws-cli s3 cp output/"$3" s3://foo-pr/"ENVIRON["GIT_COMMIT"]"/"$3}'

Once you’re satisfied the incantation looks correct, you can pipe the whole lot to bash:

docker run ... | awk ... | bash

With this working locally, it seemed simple to just run that command as a pipeline step. It was not. Trying to escape the combination of quotes in groovy proved fruitless, and in the end I just threw in a bash script, and called that from the Jenkinsfile.

While this solved one problem:

The build now took nearly twice as long to run, presumably due to copying files one at a time. I was considering using the SDK, when I realised I could just copy the needed files locally, and sync that folder instead.
mkdir changed
docker run --rm -v $PWD:/app -w /app -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY amazon/aws-cli s3 sync output/ s3://foo-prod --dryrun --size-only | awk '{sub(/&/, "\\\\&"); print "cp "$3" changed/"}' | bash
docker run --rm -v $PWD:/app -w /app -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY amazon/aws-cli s3 sync changed/ s3://foo-pr/$GIT_COMMIT --size-only

Finally, a build that is both quick(er), and uploads only the changed files!