Search code examples
apache-kafkaaws-glueapache-kafka-connectavro

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error


I am trying to configure the Confluent S3 Sink Connector to consume messages from a topic in my MSK cluster, and write them in Parquet files on S3. It is required a Schema, like Confluent Schema, but a Confluent Platform and Confluent CLI are required, so I saw that it was possible to use the Glue Schema Registry: https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The problem is that I always get the following exception in the logs:

Error converting message value in topic 'events' partition 0 at offset 2012214 and timestamp 1697576912985: Converting byte[] to Kafka Connect data failed due to serialization error

I created a Schema Registry in AWS, testregistry1, and then tried with configuration:

"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"key.converter"                                 = "org.apache.kafka.connect.storage.StringConverter"
"key.converter.schemas.enable"                  = "true"
"key.converter.avroRecordType"                  = "GENERIC_RECORD"
"key.converter.region"                          = "eu-central-1"
"key.converter.registry.name"                   = "testregistry1"
"key.converter.schemaAutoRegistrationEnabled"   = "true"
"value.converter"                               = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"value.converter.schemas.enable"                = "true"
"value.converter.avroRecordType"                = "GENERIC_RECORD"
"value.converter.region"                        = "eu-central-1"
"value.converter.registry.name"                 = "testregistry1"
"value.converter.schemaAutoRegistrationEnabled" = "true"    

# S3
"topics"               = "events"
"flush.size"           = 10
"s3.region"            = eu-central-1
"s3.bucket.name"       = "my_bucket"
"s3.part.size"         = 26214400
"storage.class"        = "io.confluent.connect.s3.storage.S3Storage"
"format.class"         = "io.confluent.connect.s3.format.parquet.ParquetFormat"
"partitioner.class"    = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"schema.compatibility" = "NONE"

# Authorization
"security.protocol"                                           = "SASL_SSL"
"sasl.mechanism"                                              = "AWS_MSK_IAM"
"sasl.jaas.config"                                            = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"sasl.client.callback.handler.class"                          = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.consumer.security.protocol"                  = "SASL_SSL"
"confluent.topic.consumer.sasl.mechanism"                     = "AWS_MSK_IAM"
"confluent.topic.consumer.sasl.jaas.config"                   = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.consumer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.producer.security.protocol"                  = "SASL_SSL"
"confluent.topic.producer.sasl.mechanism"                     = "AWS_MSK_IAM"
"confluent.topic.producer.sasl.jaas.config"                   = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.producer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"

I also tried setting the AWSKafkaAvroConnector as the key.converter, but I get the same error.

Complete logs:

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Executing stage 'VALUE_CONVERTER' with class 'com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter', where consumed record is {topic='events', partition=6, offset=2316078, timestamp=1697626522632, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] ... 18 more

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Error converting message value in topic 'events' partition 6 at offset 2316079 and timestamp 1697626522815: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)

2023-10-18T12:55:24.000+02:00   [Worker-0b0153b89034ed426] ... 18 more

Update:

I have replaced the Glue Schema Registry with Confluent Registry in an EC2 instance. As an example, I added the schema from the official page: https://docs.confluent.io/platform/current/schema-registry/develop/api.html#example-requests-format-and-valid-json with

curl -X POST http://localhost:8081/subjects/test/versions -H 'Content-Type: application/json' -H 'Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json' -d '{"schema":"{\"type\": \"record\",\"name\": \"test\",\"fields\":[{\"type\": \"string\",\"name\": \"field1\"},{\"type\": \"int\",\"name\": \"field2\"}]}"}'

And it gets created, but in my connector, after trying to set the following properties:

"key.converter"                       = "org.apache.kafka.connect.storage.StringConverter"
"value.converter"                     = "io.confluent.connect.json.JsonSchemaConverter"
"value.converter.schemas.enable"      = false
"value.converter.avroRecordType"      = "GENERIC_RECORD"
"value.converter.dataFormat"          = "JSON"
"value.converter.schema.registry.url" = "http://<ip>:8081"

I am still getting the Converting byte[] to Kafka error, but this time due to magic byte:

2023-10-25T16:39:35.000+02:00   [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1

2023-10-25T16:39:35.000+02:00   [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

The message that I am trying to send it is quite simple, with ./kafka-console-producer.sh, I send to my topic:

{"field1":"test4578_01","field2":1}

Solution

  • The message that I am trying to send it is quite simple, with ./kafka-console-producer.sh

    This isn't JSONSchema, it's just plain JSON, so doesn't interact with any Registry-associated Converter classes, thus why you see "Invalid Magic Byte", since the data isn't serialized correctly.

    ParquetFormat requires a structured event with a schema (specifically, it uses Avro to build Parquet files with parquet-avro Java package).

    https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

    I personally haven't tested JSON->Parquet, only Avro->JSON and Avro->Parquet, but you can see in the above blog that the JSON events need to look like {"schema": ..., "payload": ...} with org.apache.kafka...JsonConverter + schemas.enable=true property (the default), or otherwise you need to use kafka-json-schema-console-producer CLI command from Confluent Platform with io.confluent...JsonSchemaConverter + schema.registry.url