Search code examples
amazon-web-servicesamazon-kinesisamazon-kinesis-firehose

AWS Firehose newline Character


I've read a lot of similar questions around adding newline characters to firehose, but they're all around adding the newline character to the source. The problem is that I don't have access to the source, and a third party is piping data to our Kinesis instance and I cannot add the \n to the source.

I've tried doing a Firehose data transformation using the following code:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
    /* Process the list of records and transform them */
    const output = [];
    event.records.forEach((record) => {
        const results = {
        /* This transformation is the "identity" transformation, the data is left intact */
            recordId: record.recordId,
            result: record.data.event_type === 'alert' ? 'Dropped' : 'Ok',
            data: record.data + '\n'
        };
        output.push(results);
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

but the newline is still lost. I've also tried JSON.stringify(record.data) + '\n' but then I get an Invalid output structure error.


Solution

  • Try decoding the record.data add a new line then encode it again as base64.

    This is python but the idea is the same

    for record in event['records']:
        payload = base64.b64decode(record['data'])
        # Do custom processing on the payload here
        payload = payload + '\n'
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(payload))
        }
        output.append(output_record)
    return {'records': output}
    

    From the comment of @Matt Westlake:

    For those looking for the Node.js answer, it's

    const data =
    JSON.parse(new Buffer.from(record.data,'base64').toString('utf8')); 
    

    and

    new Buffer.from(JSON.stringify(data) + '\n').toString('base64')