I'm trying to name my source processor using the Consumed.as()
method (full code below):
val usersOrdersStreams: KStream[UserId, Order] = builder
.stream[UserId, Order](ordersByUserTopic)(Consumed.as("topic-name"))
However when I'm running the application I'm getting the following exception:
scalaorg.apache.kafka.common.config.ConfigException: Please specify a value serde or set one through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG
When I looked at the definition of .as()
I saw this:
public static <K, V> Consumed<K, V> as(final String processorName) {
return new Consumed<>(null, null, null, null, processorName);
}
So I guessed the issue was that the key/value serdes were set to null.
I tried to solve it by adding a call to withValueSerde()
:
val orderSerde = ...
val usersOrdersStreams: KStream[UserId, Order] = builder
.stream[UserId, Order](ordersByUserTopic)(Consumed.as("topic-name").withValueSerde(orderSerde))
But got the same error. What am I doing wrong?
Note: if I remove the Consumed.as() part the code works and the exception is not being thrown
Following is the full code (some imports were removed for readability reasons):
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{GlobalKTable, JoinWindows, TimeWindows, Windowed}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes._
import scala.concurrent.duration._
object KafkaStreamsApp {
implicit def serde[A >: Null : Decoder : Encoder]: Serde[A] = {
val serializer = (a: A) => a.asJson.noSpaces.getBytes
val deserializer = (aAsBytes: Array[Byte]) => {
val aAsString = new String(aAsBytes)
val aOrError = decode[A](aAsString)
aOrError match {
case Right(a) => Option(a)
case Left(error) =>
Option.empty
}
}
Serdes.fromFn[A](serializer, deserializer)
}
implicit val orderSerde: Serde[Order] = serde[Order]
// Topics
final val ordersByUserTopic = "orders-by-user"
final val filterOrders = "filter-low-orders"
final val applyMapValues = "mapValues-apply-discount"
final val payedOrdersTopic = "filtered-orders"
type UserId = String
case class Order(user: UserId, amount: Double)
val builder = new StreamsBuilder
val usersOrdersStreams: KStream[UserId, Order] =
builder.stream[UserId, Order](ordersByUserTopic)(Consumed.as("vvv").withValueSerde(orderSerde))
def paidOrdersTopology(): Unit = {
usersOrdersStreams
.filter((_, v) => v.amount > 1000.0, named = Named.as(filterOrders))
.mapValues(v => v.copy(amount = v.amount * 0.85), named = Named.as(applyMapValues))
.to(payedOrdersTopic)
}
def main(args: Array[String]): Unit = {
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-application")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass)
paidOrdersTopology()
val topology: Topology = builder.build()
println(topology.describe())
val application: KafkaStreams = new KafkaStreams(topology, props)
application.start()
}
}
So... after some digging I managed to find the issue: the key serde was missing. The following code sets only the values serde, which creates a Consumed
object with a null
key serde:
val orderSerde = ...
val usersOrdersStreams: KStream[UserId, Order] = builder
.stream[UserId, Order](ordersByUserTopic)(Consumed.as("topic-name").withValueSerde(orderSerde))
When I added the key serde as well:
val orderSerde = ...
val consumed = Consumed.as("topic-name")
.withKeySerde(Serdes.stringSerde) // Missing key serde
.withValueSerde(orderSerde)
val usersOrdersStreams: KStream[UserId, Order] =
builder.stream[UserId, Order](ordersByUserTopic)(consumed)
The code started working.
The only thing I'm not sure about is why the error thrown stated that value serde was missing, when it's the key serde that's missing.