Re-processing failed firehose batches

If a batch of records fails during the transformation step, they will be dumped in a folder named /processing-failed/YYYY/MM/DD/HH/ in your s3 bucket.

The file (possibly gzipped) will contain a line for each record, in this format:

{
    "attemptsMade":4,
    "arrivalTimestamp":1616630407645,
    "errorCode":"Lambda.FunctionError",
    "errorMessage":"The Lambda function was successfully invoked but it returned an error result.",
    "attemptEndingTimestamp":1616630456597,
    "rawData":"someb64==",
    "lambdaArn":"arn:aws:lambda:region:account:function:function:$LATEST"
}

The error message isn’t particularly informative, so you’ll need to check the lambda logs. Once you’ve fixed the lambda (or removed the offending record), there doesn’t seem to be any one click way to re-process the batch.

But it’s relatively straightforward to script it, using python (or any other available sdk):

import base64
import boto3
import json
import sys

filename = sys.argv[1]

with open(filename) as f:
    all_records = list(map(lambda l: { "Data": base64.b64decode(json.loads(l)["rawData"]) }, f.readlines()))

batch_size = 100
batches = [all_records[i:i + batch_size] for i in range(0, len(all_records), batch_size)]

client = boto3.client('firehose')

for batch in batches:
    response = client.put_record_batch(
        DeliveryStreamName='some-firehose',
        Records=batch
    )
    if response["FailedPutCount"] > 0:
        print(response)
        raise Exception("Bad batch")

If your records are quite small, you can probably increase the batch size (max 500, or 4MB).