I am trying to configure the Confluent S3 Sink Connector to consume messages from a topic in my MSK cluster, and write them in Parquet files on S3. It is required a Schema, like Confluent Schema, but a Confluent Platform and Confluent CLI are required, so I saw that it was possible to use the Glue Schema Registry: https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/
The problem is that I always get the following exception in the logs:
Error converting message value in topic 'events' partition 0 at offset 2012214 and timestamp 1697576912985: Converting byte[] to Kafka Connect data failed due to serialization error
I created a Schema Registry in AWS, testregistry1, and then tried with configuration:
"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"key.converter.schemas.enable" = "true"
"key.converter.avroRecordType" = "GENERIC_RECORD"
"key.converter.region" = "eu-central-1"
"key.converter.registry.name" = "testregistry1"
"key.converter.schemaAutoRegistrationEnabled" = "true"
"value.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"value.converter.schemas.enable" = "true"
"value.converter.avroRecordType" = "GENERIC_RECORD"
"value.converter.region" = "eu-central-1"
"value.converter.registry.name" = "testregistry1"
"value.converter.schemaAutoRegistrationEnabled" = "true"
# S3
"topics" = "events"
"flush.size" = 10
"s3.region" = eu-central-1
"s3.bucket.name" = "my_bucket"
"s3.part.size" = 26214400
"storage.class" = "io.confluent.connect.s3.storage.S3Storage"
"format.class" = "io.confluent.connect.s3.format.parquet.ParquetFormat"
"partitioner.class" = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"schema.compatibility" = "NONE"
# Authorization
"security.protocol" = "SASL_SSL"
"sasl.mechanism" = "AWS_MSK_IAM"
"sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.consumer.security.protocol" = "SASL_SSL"
"confluent.topic.consumer.sasl.mechanism" = "AWS_MSK_IAM"
"confluent.topic.consumer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.consumer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"confluent.topic.producer.security.protocol" = "SASL_SSL"
"confluent.topic.producer.sasl.mechanism" = "AWS_MSK_IAM"
"confluent.topic.producer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"confluent.topic.producer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
I also tried setting the AWSKafkaAvroConnector as the key.converter, but I get the same error.
Complete logs:
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Executing stage 'VALUE_CONVERTER' with class 'com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter', where consumed record is {topic='events', partition=6, offset=2316078, timestamp=1697626522632, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] ... 18 more
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] [2023-10-18 10:55:24,825] ERROR Error converting message value in topic 'events' partition 6 at offset 2316079 and timestamp 1697626522815: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at java.base/java.lang.Thread.run(Thread.java:829)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)
2023-10-18T12:55:24.000+02:00 [Worker-0b0153b89034ed426] ... 18 more
Update:
I have replaced the Glue Schema Registry with Confluent Registry in an EC2 instance. As an example, I added the schema from the official page: https://docs.confluent.io/platform/current/schema-registry/develop/api.html#example-requests-format-and-valid-json with
curl -X POST http://localhost:8081/subjects/test/versions -H 'Content-Type: application/json' -H 'Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json' -d '{"schema":"{\"type\": \"record\",\"name\": \"test\",\"fields\":[{\"type\": \"string\",\"name\": \"field1\"},{\"type\": \"int\",\"name\": \"field2\"}]}"}'
And it gets created, but in my connector, after trying to set the following properties:
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"value.converter" = "io.confluent.connect.json.JsonSchemaConverter"
"value.converter.schemas.enable" = false
"value.converter.avroRecordType" = "GENERIC_RECORD"
"value.converter.dataFormat" = "JSON"
"value.converter.schema.registry.url" = "http://<ip>:8081"
I am still getting the Converting byte[] to Kafka error, but this time due to magic byte:
2023-10-25T16:39:35.000+02:00 [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
2023-10-25T16:39:35.000+02:00 [Worker-00b8ad4cf64746b30] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
The message that I am trying to send it is quite simple, with ./kafka-console-producer.sh, I send to my topic:
{"field1":"test4578_01","field2":1}
The message that I am trying to send it is quite simple, with ./kafka-console-producer.sh
This isn't JSONSchema, it's just plain JSON, so doesn't interact with any Registry-associated Converter classes, thus why you see "Invalid Magic Byte", since the data isn't serialized correctly.
ParquetFormat
requires a structured event with a schema (specifically, it uses Avro to build Parquet files with parquet-avro
Java package).
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
I personally haven't tested JSON->Parquet, only Avro->JSON and Avro->Parquet, but you can see in the above blog that the JSON events need to look like {"schema": ..., "payload": ...}
with org.apache.kafka...JsonConverter
+ schemas.enable=true
property (the default), or otherwise you need to use kafka-json-schema-console-producer
CLI command from Confluent Platform with io.confluent...JsonSchemaConverter
+ schema.registry.url