Search code examples
scalaapache-kafka-streams

Kafka Streams join produces no results (Scala)


I am trying to join two streams using the following code. Given I join on everything (the ValueJoinerWithKey just returns a dummy String "foo"), shouldn't there be some output on the console? I confirmed both topics have data and can be printed to the console, but the join does nothing. Any ideas?

Update:

I think it's because my elements do not have a key:

key:null value:{"sessionId": "92abe296", "eventTime": "2023-03-14 01:09:42", "foo": "bar"}

Code

package content.streams

import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
import org.apache.kafka.streams.kstream.{JoinWindows, KStream, Printed, ValueJoinerWithKey}

import java.time.Duration
import java.util.Properties

object KafkaStreams {
  private val config: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-test")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p
  }

  private object Topics {
    val streamInputA = "streamA"
    val streamInputB = "streamB"
    val streamOut = "streamOut"
  }

  private var builder = new StreamsBuilder()
  private val streamA: KStream[String, String] = builder.stream[String, String](Topics.streamInputA)
  private val streamB: KStream[String, String] = builder.stream[String, String](Topics.streamInputB)

//  works
//  streamA.print(Printed.toSysOut())
//  streamB.print(Printed.toSysOut())

//  works as well
//  streamA.to(Topics.cartConfirmation)
//  streamB.to(Topics.cartConfirmation)


  private val joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10))

  private val valueJoinerWithKey = new ValueJoinerWithKey[String, String, String, String] {
    override def apply(key: String, value1: String, value2: String): String = {
      "foo"  // return "foo" as the join result
    }
  }

  streamA.join(
    streamB,
    valueJoinerWithKey,
    joinWindow
  ).print(Printed.toSysOut())  // nothing is printed

  private val streams: KafkaStreams = new KafkaStreams(builder.build(), config)

  def main(args: Array[String]) = {
    streams.start()
  }
}

Solution

  • I think it's because my elements do not have a key:

    Yes, keys are required to join. But you can populate a key for the stream using either a map or selectKey operators before executing the join operation.