Search code examples
scalaapache-kafkaavroapache-kafka-streamsjsonschema

Why do I get this compilation error: "could not find implicit value for kstream.Consumed" and how could I fix it?


We are having these dependencies:

libraryDependencies += "org.apache.kafka"       %% "kafka-streams-scala"         % kafkaVersion
libraryDependencies += "io.confluent"           % "kafka-streams-avro-serde"     % confluentVersion
libraryDependencies += "io.confluent"           % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback"         % "logback-classic"              % "1.2.3"
libraryDependencies += "com.typesafe"           % "config"                       % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s"    %% "avro4s-core"                 % "3.0.4"

We use a code generator to generate Scala case classes out of AVRO schema files. One such generated case class has, as one of its fields, an Either value. In AVRO schema this is expressed with type=[t1,t2] so the generation seems to be decent, that is a sum type: can be type t1 or type t2.

The question becomes what is missing on the deserialization path from topic to case class (binary -> Avro Map -> case class).

Basically I am getting this error currently:

could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error]       .stream[String, UserEvent]("schma.avsc")

The first thought was kafka-streams-avro-serde, but it may be that this library only ensure the Serde[GenericRecord] for AVRO Map, not for case classes. So one of the other dependencies is helping with the AVRO GenericRecord to case classes mapping and back. We also have some hand written code that generates case classes out of schemas, that seems to work directly with spray json.

I'm thinking that in the (binary <-> Avro GenericRecord <-> case class instance) transformations, there is a gap, and it could be the fact that in the case class there is an Either field?

I'm taking a path now to try to create a Serde[UserEvent] instance. So that in my understanding would involve converting between UserEvent and AVRO GenericRecord, similar to Map, and then between AVRO Record and binary - which is likely covered by the kafka-streams-avro-serde dependency, like there should be a Serde[GenericRecord] or similar.

Imports wise, we have this to import implicits:


import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed

Solution

  • In fact an import was missing. Now it works to compile. Here are the imports:

    import org.apache.kafka.streams.Topology
    import org.apache.kafka.streams.scala.ImplicitConversions._
    import org.apache.kafka.streams.scala.Serdes._