Search code examples
scalaapache-kafkafs2cats-effectembedded-kafka

How to read from embedded-kafka with fs2-kafka


I am using fs2-kafka to read from embedded-kafka.

I create the embedded kafka using withRunningKafkaOnFoundPort, create topic and publish a few messages. However when I try to read it back with fs2-kafka I get a NullPointerException. I have isolated a test case and the code is below.

Here is my code:

import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, KafkaConsumer, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import scala.concurrent.ExecutionContext

class KafkaSuite extends FunSuite with EmbeddedKafka {

  val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
  implicit val contextShift = IO.contextShift(singleThreadExecutor)
  implicit val timer = IO.timer(singleThreadExecutor)

  val topic = "example"
  val partition = 0
  val clientId = "client"

  test("works") {
    val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

    withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
      createCustomTopic(topic)
      publishStringMessageToKafka(topic, "example-message1")
      publishStringMessageToKafka(topic, "example-message2")
      publishStringMessageToKafka(topic, "example-message3")
      publishStringMessageToKafka(topic, "example-message4")

      val broker = s"localhost:${actualConfig.kafkaPort}"

      val consumerSettings = ConsumerSettings[IO, String, String]
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers(broker)
        .withGroupId("group")
        .withClientId(clientId)

      val r = consumerStream[IO].using(consumerSettings)
        .evalTap(_.subscribeTo(topic))
        .evalTap(_.seekToBeginning)
        .flatMap { consumer =>
          consumer.stream.take(1)
        }
        .compile
        .toList

      val res = r.unsafeRunSync()
      Console.println(res)
      assert(res.size == 1)
    }
  }

}

build.sbt:

name := "test"

version := "0.1"

scalaVersion := "2.12.6"


libraryDependencies ++= Seq(
  "org.scalatest" % "scalatest_2.12" % "3.1.2" % "test",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "com.github.fd4s" %% "fs2-kafka" % "1.0.0",
  "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1.1" % Test
)

An here is the stacktrace:

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
    at java.lang.String.<init>(String.java:515)
    at fs2.kafka.Deserializer$.$anonfun$string$1(Deserializer.scala:208)
    at fs2.kafka.Deserializer$.$anonfun$lift$1(Deserializer.scala:184)
    at fs2.kafka.Deserializer$$anon$1.deserialize(Deserializer.scala:133)
    at fs2.kafka.ConsumerRecord$.deserializeFromBytes(ConsumerRecord.scala:166)
    at fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:177)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:378)
    at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:300)
    at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:245)
    at cats.Traverse$Ops.traverse(Traverse.scala:19)
    at cats.Traverse$Ops.traverse$(Traverse.scala:19)
    at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:376)
    at cats.instances.VectorInstances$$anon$1.$anonfun$traverse$2(vector.scala:80)
    at cats.instances.VectorInstances$$anon$1.loop$2(vector.scala:43)
    at cats.instances.VectorInstances$$anon$1.$anonfun$foldRight$2(vector.scala:44)
    at cats.Eval$.advance(Eval.scala:271)
    at cats.Eval$.loop$1(Eval.scala:350)
    at cats.Eval$.cats$Eval$$evaluate(Eval.scala:368)
    at cats.Eval$Defer.value(Eval.scala:257)
    at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:79)
    at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:15)
    at cats.Traverse$Ops.traverse(Traverse.scala:19)
    at cats.Traverse$Ops.traverse$(Traverse.scala:19)
    at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
    at fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:373)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$2(KafkaConsumerActor.scala:405)
    at cats.effect.internals.IORunLoop$.liftedTree1$1(IORunLoop.scala:95)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:95)
    at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
    at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:86)
    at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
    at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
    at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
    at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:72)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:52)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
    at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
    at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • Turns out the problem is that the key type in ConsumerSettings[IO, String, String] is String but embedded-kafka writes Null as a key, so on deserializing the key it fails with NullPointerException. Setting key type to Unit solves the problem with exception.

    Another problem is that withRunningKafkaOnFoundPort finished before the evaluation of the IO starts. To have it running it is needed to make a Resource from embedded-kafka and wrap the IO into that.

    val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))
    

    Next problem is that fs2-kafka cannot work with a single thread executor so you have to provide it with an executor pool (for example ExecutionContext.global).

    Here is a full working example:

    import cats.effect._
    import fs2.Stream
    import fs2.kafka.{AutoOffsetReset, ConsumerSettings, consumerStream}
    import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
    import org.scalatest.FunSuite
    
    import scala.concurrent.ExecutionContext
    
    class KafkaSuite extends FunSuite with EmbeddedKafka {
    
      implicit val ec = ExecutionContext.global
      implicit val contextShift = IO.contextShift(ec)
      implicit val timer = IO.timer(ec)
    
      val topic = "example"
      val partition = 0
      val clientId = "client"
      val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
    
      def broker(port: Long) = s"localhost:${port}"
    
      val consumerSettings = ConsumerSettings[IO, Unit, String]
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withEnableAutoCommit(true)
        .withGroupId("group")
        .withClientId(clientId)
    
      val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))
    
      test("works") {
        val r = Stream.resource(embeddedKafka).flatMap { kafka =>
          implicit val actualConfig: EmbeddedKafkaConfig = kafka.config
          createCustomTopic(topic)
          publishStringMessageToKafka(topic, "example-message1")
          publishStringMessageToKafka(topic, "example-message2")
          publishStringMessageToKafka(topic, "example-message3")
          publishStringMessageToKafka(topic, "example-message4")
    
          consumerStream(consumerSettings.withBootstrapServers(broker(actualConfig.kafkaPort)))
            .evalTap(_.subscribeTo(topic))
            .evalTap(_.seekToBeginning)
            .flatMap(_.stream)
            .map(_.record.value)
            .take(1)
        }
        val res = r.compile.toList.unsafeRunSync()
        assert(res.contains("example-message1"))
      }
    
    }