Search code examples
amazon-kinesis-firehose

JSON data exceeds aws kinesis firehose put_record limit, is there a work around?


I'm fetching streaming data from an API and then sending the raw data to S3 bucket via Kinesis Firehose. Occasionally, the data size exceeds the limit I can send through firehose, so I get the following error

botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the PutRecord operation: 1 va lidation error detected: Value at 'record.data' failed to satisfy constraint: Member must have length less than or equal to 1024000

What is the best way to work around this so I end up with something resembling the original structure? I was thinking some sore of buffering/chunking or should I just write to file and push directly into S3?


Solution

  • I figured it out, found the following statement in the API docs:

    Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.

    So I realized I can send json string in chunks of <= 1000 kB and then send the last chunk ending with '\n' to close the buffer to ensure the original complete data structure is intact.

    I then implemented the function below that checks the size of the json string and if within the size limits processes the whole data. If not, then send in chunks via put_record_batch().

    def send_to_firehose(json_data: str, data_name: str, verbose=False):
        if len(json_data) > 1024000:
            # send json_data in chunks of 1000000 bytes or less
            start = 0
            end = 1000000
            chunk_batch = list()
            while True:
                chunk_batch.append({'Data': json_data[start:end]})
                start = end
                end += 1000000
                if end >= len(json_data):
                    end = len(json_data) + 1
                    chunk_batch.append({'Data': json_data[start:end] + '\n'})
                    firehose_batch(
                        client=AWS_FIREHOSE_CLIENT, data_name=data_name,
                        records=chunk_batch, verbose=verbose
                    )
                    break
        else:
            record = {'Data': json_data + '\n'}
            firehose_put(
                client=AWS_FIREHOSE_CLIENT, data_name=data_name,
                record=record, verbose=verbose
            )