If you want to use a lambda to transform data being sent to a Firehose, the recommended “blueprint” looks something like this:
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
# Do custom processing on the payload here
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}
which works perfectly, unless you want to later query the records stored in S3 using Athena. Then you discover that because the serialized json is not newline separated, you’re up shit creek.
The secret is to add the newline, after processing:
r = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(data).encode() + b'\n').decode("utf-8")
}