Search code examples
rubyamazon-s3amazon-kinesis-firehose

Insert newlines between AWS Kinesis Firehose records when using the AWS SDK for Ruby


I have an AWS Kinesis Firehose that sends data to Redshift via S3. I'd like to have newlines appear between records sent using put_record_batch. Currently my code looks like this:

records = [{ id: 1, value: "foo" }, { id: 2, value: "bar" }]
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record_batch({
  delivery_stream_name: "my_firehose",
  records: records
)

The records that end up in S3 look like this:

{"id":1,"value":"foo"}{"id":2,"value":"bar"}

I would like for the S3 files to instead look like this:

{"id":1,"value":"foo"}
{"id":2,"value":"bar"}

This will make it easier to manually parse the files when necessary (for example, if we need to debug why data isn't making it from S3 to Redshift).

The solution for put_record is simple: you convert the data to JSON and add a newline:

record = { id: 1, value: "foo" }
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record({
  delivery_stream_name: "my_firehose",
  data: record.to_json << "\n"
)

I tried to do something similar with put_record_batch:

records = [{ id: 1, value: "foo"}, { id: 2, value: "bar" }]
json_records = records.map { |record| record.to_json << "\n" }
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record_batch({
  delivery_stream_name: "my_firehose",
  records: json_records
)

But this resulted in the error:

ArgumentError: parameter validator found 2 errors:
  - expected params[:records][0] to be a hash, got value "{\"id\":1,\"value\":\"foo\"}\n" (class: String) instead.
  - expected params[:records][1] to be a hash, got value "{\"id\":2,\"value\":\"bar\"}\n" (class: String) instead.
from /mnt/istore/apps/my_app/shared/bundle/ruby/2.7.0/gems/aws-sdk-core-3.89.1/lib/aws-sdk-core/param_validator.rb:33:in `validate!'

So it seems that we're required to send a hash.

The documentation for put_record_batch says:

Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.

How do I do this?

I'm using version 1.26.0 of the aws-sdk-firehose gem.


Solution

  • I think the problem was that I was leaving off the data key when using put_record_batch. This seems to work:

    records = [{ id: 1, value: "foo"}, { id: 2, value: "bar" }]
    json_records = records.map do |record|
      # Previously this line was `record.to_json << "\n"`
      { data: record.to_json << "\n" }
    end
    Aws::Firehose::Client.new(
      region: "us-east-1"
    ).put_record_batch({
      delivery_stream_name: "my_firehose",
      records: json_records
    )