Search code examples
scalaapache-kafka-streams

Adding a name to source processor of Kafka streams app results in serialization exception


I'm trying to name my source processor using the Consumed.as() method (full code below):

val usersOrdersStreams: KStream[UserId, Order] = builder
  .stream[UserId, Order](ordersByUserTopic)(Consumed.as("topic-name"))

However when I'm running the application I'm getting the following exception: scalaorg.apache.kafka.common.config.ConfigException: Please specify a value serde or set one through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG

When I looked at the definition of .as() I saw this:

 public static <K, V> Consumed<K, V> as(final String processorName) {
   return new Consumed<>(null, null, null, null, processorName);
 }

So I guessed the issue was that the key/value serdes were set to null. I tried to solve it by adding a call to withValueSerde():

val orderSerde = ...
val usersOrdersStreams: KStream[UserId, Order] = builder
  .stream[UserId, Order](ordersByUserTopic)(Consumed.as("topic-name").withValueSerde(orderSerde))

But got the same error. What am I doing wrong?

Note: if I remove the Consumed.as() part the code works and the exception is not being thrown

Following is the full code (some imports were removed for readability reasons):


import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{GlobalKTable, JoinWindows, TimeWindows, Windowed}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes._
import scala.concurrent.duration._

object KafkaStreamsApp {

  implicit def serde[A >: Null : Decoder : Encoder]: Serde[A] = {
    val serializer = (a: A) => a.asJson.noSpaces.getBytes
    val deserializer = (aAsBytes: Array[Byte]) => {
      val aAsString = new String(aAsBytes)
      val aOrError = decode[A](aAsString)
      aOrError match {
        case Right(a) => Option(a)
        case Left(error) =>
          Option.empty
      }
    }
    Serdes.fromFn[A](serializer, deserializer)
  }
  implicit val orderSerde: Serde[Order] = serde[Order]

  // Topics
  final val ordersByUserTopic = "orders-by-user"
  final val filterOrders = "filter-low-orders"
  final val applyMapValues = "mapValues-apply-discount"
  final val payedOrdersTopic = "filtered-orders"

  type UserId = String
  case class Order(user: UserId, amount: Double)

  val builder = new StreamsBuilder

  val usersOrdersStreams: KStream[UserId, Order] =
    builder.stream[UserId, Order](ordersByUserTopic)(Consumed.as("vvv").withValueSerde(orderSerde))

  def paidOrdersTopology(): Unit = {
    usersOrdersStreams
      .filter((_, v) => v.amount > 1000.0, named = Named.as(filterOrders))
      .mapValues(v => v.copy(amount = v.amount * 0.85), named = Named.as(applyMapValues))
      .to(payedOrdersTopic)
  }

  def main(args: Array[String]): Unit = {
    val props = new Properties
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-application")
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass)

    paidOrdersTopology()

    val topology: Topology = builder.build()

    println(topology.describe())

    val application: KafkaStreams = new KafkaStreams(topology, props)
    application.start()
  }
}

Solution

  • So... after some digging I managed to find the issue: the key serde was missing. The following code sets only the values serde, which creates a Consumed object with a null key serde:

    val orderSerde = ...
    val usersOrdersStreams: KStream[UserId, Order] = builder
      .stream[UserId, Order](ordersByUserTopic)(Consumed.as("topic-name").withValueSerde(orderSerde))
    

    When I added the key serde as well:

    val orderSerde = ...
    val consumed = Consumed.as("topic-name")
        .withKeySerde(Serdes.stringSerde) // Missing key serde
        .withValueSerde(orderSerde)
    val usersOrdersStreams: KStream[UserId, Order] =
      builder.stream[UserId, Order](ordersByUserTopic)(consumed)
    

    The code started working.

    The only thing I'm not sure about is why the error thrown stated that value serde was missing, when it's the key serde that's missing.