I'm fetching streaming data from an API and then sending the raw data to S3 bucket via Kinesis Firehose. Occasionally, the data size exceeds the limit I can send through firehose, so I get the following error
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the PutRecord operation: 1 va lidation error detected: Value at 'record.data' failed to satisfy constraint: Member must have length less than or equal to 1024000
What is the best way to work around this so I end up with something resembling the original structure? I was thinking some sore of buffering/chunking or should I just write to file and push directly into S3?
I figured it out, found the following statement in the API docs:
Kinesis Data Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination.
So I realized I can send json string in chunks of <= 1000 kB and then send the last chunk ending with '\n' to close the buffer to ensure the original complete data structure is intact.
I then implemented the function below that checks the size of the json string and if within the size limits processes the whole data. If not, then send in chunks via put_record_batch().
def send_to_firehose(json_data: str, data_name: str, verbose=False):
if len(json_data) > 1024000:
# send json_data in chunks of 1000000 bytes or less
start = 0
end = 1000000
chunk_batch = list()
while True:
chunk_batch.append({'Data': json_data[start:end]})
start = end
end += 1000000
if end >= len(json_data):
end = len(json_data) + 1
chunk_batch.append({'Data': json_data[start:end] + '\n'})
firehose_batch(
client=AWS_FIREHOSE_CLIENT, data_name=data_name,
records=chunk_batch, verbose=verbose
)
break
else:
record = {'Data': json_data + '\n'}
firehose_put(
client=AWS_FIREHOSE_CLIENT, data_name=data_name,
record=record, verbose=verbose
)