I am trying to join two streams using the following code. Given I join on everything (the ValueJoinerWithKey
just returns a dummy String "foo"), shouldn't there be some output on the console? I confirmed both topics have data and can be printed to the console, but the join does nothing. Any ideas?
Update:
I think it's because my elements do not have a key:
key:null value:{"sessionId": "92abe296", "eventTime": "2023-03-14 01:09:42", "foo": "bar"}
Code
package content.streams
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
import org.apache.kafka.streams.kstream.{JoinWindows, KStream, Printed, ValueJoinerWithKey}
import java.time.Duration
import java.util.Properties
object KafkaStreams {
private val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
private object Topics {
val streamInputA = "streamA"
val streamInputB = "streamB"
val streamOut = "streamOut"
}
private var builder = new StreamsBuilder()
private val streamA: KStream[String, String] = builder.stream[String, String](Topics.streamInputA)
private val streamB: KStream[String, String] = builder.stream[String, String](Topics.streamInputB)
// works
// streamA.print(Printed.toSysOut())
// streamB.print(Printed.toSysOut())
// works as well
// streamA.to(Topics.cartConfirmation)
// streamB.to(Topics.cartConfirmation)
private val joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10))
private val valueJoinerWithKey = new ValueJoinerWithKey[String, String, String, String] {
override def apply(key: String, value1: String, value2: String): String = {
"foo" // return "foo" as the join result
}
}
streamA.join(
streamB,
valueJoinerWithKey,
joinWindow
).print(Printed.toSysOut()) // nothing is printed
private val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
def main(args: Array[String]) = {
streams.start()
}
}
I think it's because my elements do not have a key:
Yes, keys are required to join. But you can populate a key for the stream using either a map or selectKey operators before executing the join
operation.