Search code examples
apache-kafkaapache-kafka-connectdebezium

Debezium Outbox Pattern property transforms.outbox.table.expand.json.payload not working


I'm implementing an outbox pattern using the debezium postgres connector, building up upon the official documentation: https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html.

Everything is working quite fine - except that the property "transforms.outbox.table.expand.json.payload: true" is not working.

Using the following database record (SQL instert):

INSERT INTO public.outbox_event_entity (id, event_id, "key", payload, topic, "type") VALUES(0, 'e09d6355-8e7c-4055-936c-4f997423925e', '1', '{"key":"value"}'::jsonb, 'topic', 'NEW_EVENT');

The produced record's payload contains a string of escaped json instead of a real json field:

"{\"key\": \"value\"}"

I'm using this configuration:

spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  config:
    value.converter: org.apache.kafka.connect.json.JsonConverter
    table.include.list: public.outbox_event_entity
    transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
    publication.autocreate.mode: FILTERED
    plugin.name: pgoutput
    transforms: outbox
    transforms.outbox.table.fields.additional.placement: 'type:header,event_id:header,timestamp_created:header'
    value.converter.schemas.enable: false
    transforms.outbox.table.field.event.key: id
    topic: topic
    key.converter: org.apache.kafka.connect.json.JsonConverter
    transforms.outbox.route.by.field: topic
    transforms.outbox.table.expand.json.payload: true
    connector.class: io.debezium.connector.postgresql.PostgresConnector
    include.schema.changes: false
    file: /opt/kafka/LICENSE
    key.converter.schemas.enable: false
    [database properties omitted]

Can someone spot a mistake of mine?

Best regards Andy


Solution

  • I ran into the same issue here and found a solution using a different value converter. For example my previous output into kafka looked like this:

    "{\"id\": 3}"
    

    But after updating to connector config with:

    "value.converter": "org.apache.kafka.connect.storage.StringConverter"

    My output of the same payload looks like this:

    {"id": 3}