Add newlines to Firehose transform lambda

If you want to use a lambda to transform data being sent to a Firehose, the recommended “blueprint” looks something like this:

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    return {'records': output}

which works perfectly, unless you want to later query the records stored in S3 using Athena. Then you discover that because the serialized json is not newline separated, you’re up shit creek.

The secret is to add the newline, after processing:

r = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': base64.b64encode(json.dumps(data).encode() + b'\n').decode("utf-8")
    }

Flattening json arrays, with Athena

If you have an S3 bucket, chock full of newline separated json blobs (e.g. from a firehose ingest pipeline); you may want to query said files, using a SQL-like prompt. This is the promise of Athena.

If your json blobs are very similar, you can specify the schema when creating the table; and parsing is taken care of, for you. The rest of the time, you are in the land of “semi-structured data”.

CREATE EXTERNAL TABLE `foo_202510`(
  `id` bigint COMMENT 'from deserializer',
  `j` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'case.insensitive'='TRUE', 
  'dots.in.keys'='FALSE', 
  'ignore.malformed.json'='FALSE', 
  'mapping'='TRUE') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://some-bucket/2025/10/'
TBLPROPERTIES (
  'classification'='json', 
  'transient_lastDdlTime'=...)

The json files in the S3 bucket are organised by YYYY/MM/DD, which allows me to partition the data, and only scan what I am interested in (as it is many GBs). Assuming each json blob looks like this:

{ "id": 124232, "j": { ... } }

I am loading the id into one col, and the nested object (as a string) into another. I can then run a query, against that table:

SELECT
    id,
    json_extract(j, '$.abc')
FROM "foo_202510"
LIMIT 1

A more complicated scenario, is that the nested json contains an array – that I wish to flatten, so I have a row in the output per item in the array:

WITH data AS (
    SELECT
        id,
        cast(json_extract(j, '$.a') as array(json)) arr
    FROM "foo_202510"
    WHERE ...
)
SELECT
    id,
    json_extract(a, '$.b') b
FROM data
CROSS JOIN UNNEST(arr) AS t(a)

We start with a CTE, extracting the array – and casting it to the correct type. We can then perform a cross join on this temp table, to get a row per array item.