We use io.confluent.connect.sftp.SftpCsvSourceConnector
to read csv file from sftp location and push the message into kafka.... but the message in kafka topic dont come as a csv (string, separated comma), but the comme in specific key-value format Struct : ex Struct{branchBaseCurrCode=EUR, Country=CA} ...
I use Kafka-Streams.... how do I deserialize this ? what config should I use? what java object does it deserialize to? Can I deserialize this straight to my POJO knowing that I my attributes are exactly the same as the schema?
Can the SftpCsvSourceConnector
writes directly the in json
instead of Struct
?
Configure your connector to use an appropriate Converter
, such as Avro, Protobuf, etc. You can even use plain JSON if you want.
E.g.:
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained