I have two KStreams of Avro SpecificRecords that I want to join. I can independently process both the streams but I can't understand the code that I can use to join the two. Here is my code so far:
KStream<String, RecordOne> recOneStream = streamsBuilder.stream(recOneTopic, Consumed.with(Serdes.String(), recOneSpecificSerde));
KStream<String, RecordTwo> recTwoStream = streamsBuilder.stream(recTwoTopic, Consumed.with(Serdes.String(), recTwoSpecificSerde));
// change key to match key of rec two
KStream<String, RecordOne> recOneChangedKeyStream = recOneStream.selectKey((k, v) -> v.getKeyValue().toString());
// folowing works
recOneChangedKeyStream.peek((k, v) -> System.out.println("Key : " + k + " Value : " + v)); // output is as expected here
// trying to make following work?
KStream<String, JoinedRecord> joinedRecord = recOneChangedKeyStream.join(recTwoStream, (recOn, recTwo) -> {
JoinedRecord jr = new JoinedRecord();
jr.setFieldOne...
return jr;
},
JoinWindows.of(Duration.ofSeconds(60)),
// if I add following line the code breaks at compile time, if I don't add it then it breaks at runtime
Joined.with(Serdes.String(), recOneSpecificSerde, recTwoSpecificSerde)
);
So the problem is in the Joined.with. I think I have followed the example correctly: confluent-example as it is also using the Joined.with in a similar way that I have. However, in my case I see the following exception in the IDE:
Cannot resolve method 'join(org.apache.kafka.streams.kstream.KStream<java.lang.String, RecordTwo>, <lambda expression>, org.apache.kafka.streams.kstream.JoinWindows, org.apache.kafka.streams.kstream.Joined<K,V,VO>)'
The confluent version installed on my machine is confluent-7.2.2.tar.gz
As seen in the screenshot below the documentation states that Joined.with should be used. But after reading through the error message in the IDE and then going on the GitHub examples I noticed that this document is not updated as the required argument is of type StreamJoined
instead of Joined