Search code examples
apache-kafkaapache-kafka-streams

How to push selective fields from the key and value from Topic1 to Topic2 using kafka streams


I am using KStream and would like to extract and push only a subset of the fields from the key and value from Topic1 to Topic2.

For ex, if my value in message contains fields like id, name, address, phoneno... i would like to push only id, name and address to a new topic. This is similar to replacefield.whitelist transform, but i would like to try with KStream.

I am able to do this for a single field, but not sure how to prepare the new value for multiple fields (resulting in a GenericRecord).


Solution

  • You'd use the KStream.map function to get access to both the key and value then you'd return a new key value pair.

    stream.map((k, v) -> {
        GenericRecord r = new GenericData.Record();  // needs a schema 
        r.put("name", v.getName());  // for example 
        return new KeyValue(k, r);
    }).to("topic2")
    

    If not using Avro Serde, or other generic type, you'd also need to define your own serde class.