Search code examples
apache-kafkaapache-kafka-connectdebezium

AWS MSK + S3 Sink - JSON Messages getting stored as Structs instead of JSON


I'm doing some Postgres CDC work using Debezium, AWS MSK + MSK Connect, and an S3 Sink. I have a working configuration for both Debezium + the S3 Sink below. All the permissions + connection stuff is set up already.

The databases changes are getting captured & stored to S3 as JSON files, but in the file itself the messages seem to be Structs (see below).

enter image description here

I'm expecting the messages to look like this (proper JSON format).

enter image description here

Debezium Config

connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user=user
database.dbname=test_db
database.server.id=5400
tasks.max=1
transforms=unwrap,dropTopicPrefix
database.server.name=dev_postgres
schema.include.list=test
transforms.dropTopicPrefix.regex=dev_postgres.test.(.*)
database.port=5432
plugin.name=pgoutput
include.schema.changes=true
transforms.dropTopicPrefix.replacement=$1
database.hostname=host
database.password=xxx
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.dropTopicPrefix.type=org.apache.kafka.connect.transforms.RegexRouter

S3 Config

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
flush.size=65536
schema.compatibility=NONE
tasks.max=1
topics=reservation,reservation2
timezone=UTC
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=bucket
rotate.schedule.interval.ms=60000

I'm doing some drop topic prefix stuff in Debezium so the topic names become just the names of the table.

The only other thing to note is you have to define a default worker config in MSK, so i've been using this one. Not sure if it's impacting this or causing it to get screwed up.

enter image description here

I imagine there's some key or value converter I screwed up somewhere to get these messages to store as JSON but I can't figure it out. If anyone can have a look and point in the right direction I'd appreciate it !


Solution

  • You need to use JSONConverter rather than StringConverter in the sink. Ideally, you'd have Debezium doing the same, so you can set a default in the worker properties, or override in each connector, up to you...

    Otherwise, it will toString the Connect Struct objects