Search code examples
apache-kafkaapache-kafka-streamsapache-kafka-connect

How do we deserialize value/key from Struct data?


We use io.confluent.connect.sftp.SftpCsvSourceConnector to read csv file from sftp location and push the message into kafka.... but the message in kafka topic dont come as a csv (string, separated comma), but the comme in specific key-value format Struct : ex Struct{branchBaseCurrCode=EUR, Country=CA} ...

I use Kafka-Streams.... how do I deserialize this ? what config should I use? what java object does it deserialize to? Can I deserialize this straight to my POJO knowing that I my attributes are exactly the same as the schema?

Can the SftpCsvSourceConnector writes directly the in json instead of Struct ?


Solution

  • Configure your connector to use an appropriate Converter, such as Avro, Protobuf, etc. You can even use plain JSON if you want.

    E.g.:

    "value.converter":"org.apache.kafka.connect.json.JsonConverter", 
    

    See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained