@StreamListener("notification-input-channel")
@SendTo("notification-output-channel")
public KStream<String,Notification> process(KStream<String,PosInvoice> input)
{
KStream<String,Notification> notificationStream=input.
filter(k,v)->v.getCustomerType.equals("Prime").
mapValues(v->recordBuilder.getNotification(v);
return notificationStream;
}
Firstly I am totally new to Kafka. Now comming to my question is in the input KStream I am getting the value as String and Kafka then converting it into PosInvoice object. but in the notificationStream what is the purpose of the String as key? And what is the purpose of returning a KStream object? As per my knowledge KStreams are required when I have to simulatenously Read from a the topic and Write into the topic. Can you please explain the code's data flow?
You're never modifying the keys, so doesn't really matter what their values are. They are used to partition the output to the @SendTo
topic. You have also defined a key StringSerde
somewhere to read from notification-input-channel
topic.
The purpose of returning a KStream is because this is what the method signature expects and Spring's Lifecycle hooks will build into a Kafka Streams Topology.