I have an optimized (normalized) JSON string incoming to my Lambda function. It gets forwarded to Firehose to Elastic Search. My plan is to use a Kinesis Data Transformation Lambda (see https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) to denormalize the JSON to get proper Elastic Search entries out of it. This question ist about doability/how to do it.
This is my basic AWS setup:
putRecord
.My question is: Is it possible that Firehose/the transformation lambda can create multiple Elastic Search entries out of one record? I'll try to visualize the scenario with some pseudo code:
lambda.js
exports.handler = async function (event) {
// 1. get inputDoc from request, which contains multiple es_documents
// 2. attach timestamp as property
// result:
const inputDoc = {
request_logged_at: '2017-02-07T15:13:01.39256Z',
es_documents: [
{
foo: 'bar'
},
{
foo: 'baz'
}
]
};
const firehoseParams = {
DeliveryStreamName: 'Delivery-Stream',
Record: {
Data: JSON.stringify(inputDoc)
}
};
await firehose.putRecord(firehoseParams).promise();
return { statusCode: 201 };
}
firehose/transform_lambda.js
exports.handler = (event, context) => {
const record = event.records[0];
const myDoc = (new Buffer(record.data, 'base64')).toString('utf8');
// Denormalize request_logged_at into the single documents, so every document in Elastic knows when it got logged
const docsToElastic = myDoc.es_documents.map(doc => {
return doc.request_logged_at = myDoc.request_logged_at;
});
// This is the main point: How to return this array back to Firehose so Elastic creates multiple table entries?
// My first guess is the following, as I've seen this syntax in other places
// (see first "Note" on this page[https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html])
const result = docsToElastic.reduce((accumulator, doc) => {
return accumulator + JSON.stringify(doc);
}, '');
// result: '{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}{"foo":"baz","request_logged_at":"2017-02-07T15:13:01.39256Z"}'
const payload = (new Buffer(result, 'utf8')).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: payload
};
}
Can anyone share his knowledge about this use-case? Will this work?
BTW: I know I could do the denormalization in the first lambda and use firehose.putRecordBatch()
, but the first lambda has already a lot of tasks and it's also a matter of separating concerns
I solved it myself. These hints from the docs gave me an idea:
It [Firehose] then generates an Elasticsearch bulk request to index multiple records to your Elasticsearch cluster. Make sure that your record is UTF-8 encoded and flattened to a single-line JSON object before you send it to Kinesis Data Firehose.
So what actually happens under the hood is that Firehose performs an ES bulk request. So what Firehose probably does:
POST _bulk
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
So what I did was to modify one FirehoseRecord.Data
to have multiple lines of my documents, separated each by the { "index" : ...}
object:
const result = docsToElastic.reduce((accumulator, log, currentIndex) => {
return accumulator +
// Skip index at first time bcs. that is what Firehose does
(currentIndex === 0 ? '' : '{ "index" : { "_index" : "my-index"} }' + '\n') +
JSON.stringify(log) + '\n';
}, '');
Output:
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
{ "index" : { "_index" : "my-index" } }
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
{ "index" : { "_index" : "my-index" } }
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
Pay attention to the first misisng { "index" : ...}
object. This is because Firehose adds the first { "index" : ...}
itself before the whole Record.
This is tested and working right now.