Search code examples
avroapache-kafka-streamsapache-kafka-connectdebezium

Unable to read kafka-stream data from debezium-postgres's kafka-stream


I started kafka connector using following command:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-postgres/connect-postgres.properties 

Serialization props in the connect-avro-standalone.properties is:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

I've created a java backend which listen to this kafka stream topic and its able to get the data from postgres with each add/update/delete.
But the data is coming in some unknown encoding format and that's why ican't read the data correctly.
Here is the relevant code snippet:

properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());

StreamsBuilder streamsBuilder = new StreamsBuilder();

final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteSerde = Serdes.ByteArray();

streamsBuilder.stream(Pattern.compile(getTopic()), Consumed.with(stringSerde, byteSerde))
.mapValues(data -> {
  System.out.println("->"+new String(data));
  return data;
});

I'm confused on where and what I need to change; in the avro connector prop or in the java side code


Solution

  • Your Kafka Connect config here means that the messages on the Kafka topic will be Avro serialised:

    value.converter=io.confluent.connect.avro.AvroConverter
    

    Which means that you need to deserialise using Avro in your Streams app. See here for more details: https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro