Search code examples
javaamazon-kinesisamazon-kinesis-firehoseamazon-kinesis-kpl

Streaming data from Kinesis to S3 fails with Illegal Character that KPL itself writes


I have a relatively straightforward use case:

  1. Read Avro data from a Kafka topic
  2. Use KPL (v0.14.12) to send this data to Kinesis Data Streams
  3. Use Kinesis Firehose to transform this data into Parquet and transfer it to S3.

The Kafka topic was written into by Kafka Streams using the following producer Configuration:

private void addAwsGlueSpecificProperties(Map<String, Object> props) {
    props.put(AWSSchemaRegistryConstants.AWS_REGION, "eu-central-1");
    props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
    props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "Kinesis_Schema_Registry");
    props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
    props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, GlueSchemaRegistryKafkaStreamsSerde.class.getName());
}

Most notably, I've set SCHEMA_AUTO_REGISTRATION_SETTING to true to try and rule out problems with my schema definition. The auto-registration itself worked without any issues.

I have a very simple loop running for test purposes, which does step 1 and 2 of the above. It looks as follows:

KinesisProducer kinesisProducer = new KinesisProducer(getKinesisConfig());
try (final KafkaConsumer<String, AvroEvent> consumer = new KafkaConsumer<>(properties)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
    while (true) {
        log.info("Polling...");
        final ConsumerRecords<String, AvroEvent> records = consumer.poll(Duration.ofMillis(100));
        for (final ConsumerRecord<String, AvroEvent> record : records) {
            final String key = record.key();

            ListenableFuture<UserRecordResult> request = kinesisProducer.addUserRecord("my-data-stream", key, randomExplicitHashKey(), value.toByteBuffer(), gsrSchema);

            Futures.addCallback(request, CALLBACK, executor);
        }
        Thread.sleep(Duration.ofSeconds(10).toMillis());
    }
}

The callback just does a bit of logging on success/failure.

My Kinesis Config looks as follows:

private static KinesisProducerConfiguration getKinesisConfig() {
    KinesisProducerConfiguration config = new KinesisProducerConfiguration();
    GlueSchemaRegistryConfiguration schemaRegistryConfiguration = getGlueSchemaRegistryConfiguration();
    config.setGlueSchemaRegistryConfiguration(schemaRegistryConfiguration);
    config.setRegion("eu-central-1");
    config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
    config.setMaxConnections(2);
    config.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
    config.setThreadPoolSize(2);
    config.setRateLimit(100L);
    return config;
}

private static GlueSchemaRegistryConfiguration getGlueSchemaRegistryConfiguration() {
    GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration("eu-central-1");
    gsrConfig.setAvroRecordType(AvroRecordType.GENERIC_RECORD ); // have also tried SPECIFIC_RECORD
    gsrConfig.setRegistryName("Kinesis_Schema_Registry");
    gsrConfig.setCompressionType(AWSSchemaRegistryConstants.COMPRESSION.ZLIB);
    return gsrConfig;
}

This setup allows me to read Specific Avro records from Kafka and send them to Kinesis. I have also verified that the correct schema version ID is queried from GSR by my code. However, when my data gets to Firehose, I receive only the following error message for all my records (one per record):

{
  "attemptsMade": 1,
  "arrivalTimestamp": 1659622848304,
  "lastErrorCode": "DataFormatConversion.ParseError",
  "lastErrorMessage": "Encountered malformed JSON. Illegal character ((CTRL-CHAR, code 3)): only regular white space (\\r, \\n, \\t) is allowed between tokens\n at [Source: com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream@6252e7eb; line: 1, column: 2]",
  "attemptEndingTimestamp": 1659623152452,
  "rawData": "<base64EncodedData>",
  "sequenceNumber": "<seqNum>",
  "dataCatalogTable": {
    "databaseName": "<Glue database name>",
    "tableName": "<Glue table name>",
    "region": "eu-central-1",
    "versionId": "LATEST",
    "roleArn": "<arn>"
  }
}

Unfortunately I can't post the entirety of the data as it is sensitive. However, the relevant part is that it always starts with the above control character that is causing the problem:

0x03 0x05 <schemaVersionId> <data>

My original data does not contain these control characters. After some debugging, I've found that KPL explicitly adds these bytes to the beginning of a UserRecord. In com.amazonaws.services.schemaregistry.serializers.SerializationDataEncoder#write:

public byte[] write(final byte[] objectBytes, UUID schemaVersionId) {
    byte[] bytes;
    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {          
        
        writeHeaderVersionBytes(out);
        writeCompressionBytes(out);
        writeSchemaVersionId(out, schemaVersionId);

        boolean shouldCompress = this.compressionHandler != null;
        bytes = writeToExistingStream(out, shouldCompress ? compressData(objectBytes) : objectBytes);

    } catch (Exception e) {
        throw new AWSSchemaRegistryException(e.getMessage(), e);
    }

    return bytes;
}

With writeHeaderVersionBytes(out) and writeCompressionBytes(out) writing to the front of the stream, respectively:

// byte HEADER_VERSION_BYTE = (byte) 3;
private void writeHeaderVersionBytes(ByteArrayOutputStream out) {
    out.write(AWSSchemaRegistryConstants.HEADER_VERSION_BYTE);
}

// byte COMPRESSION_BYTE = (byte) 5
// byte COMPRESSION_DEFAULT_BYTE = (byte) 0
private void writeCompressionBytes(ByteArrayOutputStream out) {
    out.write(compressionHandler != null ? AWSSchemaRegistryConstants.COMPRESSION_BYTE
            : AWSSchemaRegistryConstants.COMPRESSION_DEFAULT_BYTE);
}

Why is Kinesis unable to parse a message that is produced by the library that is supposed to be best suited for writing to it? What am I missing?


Solution

  • I've finally figured out the problem and it's quite dumb.

    What it boils down to, is that the transformer that converts data to parquet in Firehose expects a pure JSON payload. It expects records in the form:

    {"itemId": 1, "itemName": "someItem"}{"itemId": 2, "itemName": "otherItem"}
    

    It seemingly does not accept the same data in a different format. This means that Avro-compatible JSON (where the above itemId would look like "itemId": {"long": 1}, or e.g. binary Avro data, is not compatible with the Kinesis Firehose parquet transformer, regardless of the fact that my schema definition in the Glue Schema Registry is explicitly registered as being in Avro format.

    In addition, the Firehose parquet transformer requires the use of a Glue table - creating this table from an imported Avro schema simply does not work (see this answer), and had to be created manually. Luckily, even though it can't use the table that is based on an existing schema, the table definition was the same (with the exception of the Serde it needs to use), so it was relatively easy to fix...

    To sum up, to get the above code to work I had to:

    1. Create a Glue table for the schema manually (you can use the first table created from the existing schema as a template for creating this second table, but you can't have Firehose link to the first table)

    2. Change the above code:

    kinesisProducer.addUserRecord("my-data-stream", key, randomExplicitHashKey(), value.toByteBuffer(), gsrSchema);
    

    to:

    ByteBuffer data = ByteBuffer.wrap(value.toString().getBytes(StandardCharsets.UTF_8));
    kinesisProducer.addUserRecord("my-data-stream", key, randomExplicitHashKey(), data);
    

    Note that the I am now using the overloaded addUserRecord function that does not include a Schema parameter, which internally invokes the previous function with a null schema parameter. This prevents the KPL from encoding my payload and instead sends the 'plain' JSON over to KDS.

    This is contrary to the only AWS Docs example that I could find on the topic, which likely is meant for a Firehose stream which does not convert the data prior to sending it to its destination.

    I can't quite understand the reasons for all these undocumented limitations, and it was a pain to debug seeing how neither of the KPL functions nor KDS explicitly mentions anywhere that I can find that this is the expected behaviour. I feel like it's not worth trying to open an issue/PR over at the KPL repo seeing how it seems like Amazon doesn't really care about maintaining it that much...

    I'll probably switch over to the plain Kinesis Client + Kinesis Aggregation for a more robust solution in the future, but hey, at least it works.