Re-processing failed firehose batches

If a batch of records fails during the transformation step, they will be dumped in a folder named /processing-failed/YYYY/MM/DD/HH/ in your s3 bucket.

The file (possibly gzipped) will contain a line for each record, in this format:

    "errorMessage":"The Lambda function was successfully invoked but it returned an error result.",

The error message isn’t particularly informative, so you’ll need to check the lambda logs. Once you’ve fixed the lambda (or removed the offending record), there doesn’t seem to be any one click way to re-process the batch.

But it’s relatively straightforward to script it, using python (or any other available sdk):

import base64
import boto3
import json
import sys

filename = sys.argv[1]

with open(filename) as f:
    all_records = list(map(lambda l: { "Data": base64.b64decode(json.loads(l)["rawData"]) }, f.readlines()))

batch_size = 100
batches = [all_records[i:i + batch_size] for i in range(0, len(all_records), batch_size)]

client = boto3.client('firehose')

for batch in batches:
    response = client.put_record_batch(
    if response["FailedPutCount"] > 0:
        raise Exception("Bad batch")

If your records are quite small, you can probably increase the batch size (max 500, or 4MB).

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s