Search code examples
apache-kafkaavroapache-kafka-streamsconfluent-platform

KStream issue to read payload coming as before and after in avro format


I am developing KStream application on confluent kafka platform. Data is present on source topic in nested avro format as mentioned below

{
   "type":"record",
   "namespace":"xyz",
   "table":"abc",
   "op_type":{
      "string":"D"
   },
   "op_ts":{
      "string":"2020-05-16 09:03:25.000462"
   },
   "pos":{
      "string":"00000000000000010722",
      "before":{
[
            "fields":            {
               "column1":"value"
            },
            {
               "column2":"value"
            },
            {
               "column3":"value"
            }
         ]
      },
      "after":{
[
            "fields":            {
               "column1":"value"
            },
            {
               "column2":"value"
            },
            {
               "column3":"value"
            }
         ]
      }
   }

I want to filter records based on op_type="D" as wanted to separate deleted records to other Kafka topic.

I am facing issue to write schema on output topic facing deserialization error : for required row found array.

I have created POJO objects using apache maven-avro plugin.

for "before tag" I mentioned type as array and passing object of before class to it. Same for "After tag" I mentioned type as array and passing object of after to it. Rightnow I can not use KSQL here as service is not available in my project.

Need solution how to resolve this nested schema or anyother way to filter deleted records to other kafka topic from source topic.


Solution

  • To resolve above issue, I used generic avro serializer to process data using Kafka streams.

    If we use specific avro serializer, we can not filter nested data. Using generic avro serializer will help to filter perticular record with string. I used that and resolved above issue.