Search code examples
scalaapache-kafkaakkaakka-streamalpakka

Akka Stream Kafka: No configuration setting found for key 'kafka-clients'


I'm trying to create a simple prototype using Alpakka Kafka connector (Akka Stream Kafka).

When running the application I get the following error:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'

I have the following code in ./src/main/scala/App.scala:

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system = ActorSystem("fakeProducer")
    implicit val materializer: Materializer = ActorMaterializer()

    val config = system.settings.config // ConfigFactory.load()

    val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

    val done: Future[Done] =
      Source(1 to 100)
        .map(_.toString)
        .map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
        .runWith(Producer.plainSink(producerSettings))


    println("Done")
  }
}

The following build.sbt:

name := "test-akka-stream"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

I run the app using sbt run. I have not configured any uber/assembly jar.

I'm probably missing something obvious but I cannot see it ... I suspect there is some problem with akka dependencies.

UPDATE

As suggested by @terminally-chill calling ProducerSettings(system, new StringSerializer, new StringSerializer) (passing ActorSystem instead of a configuration) solve the problem. I just don't understand if this is by design or a bug.

UPDATE 2

I have create a github issue that is already been fixed. Now documentation is more accurate and explain the correct way to create the ProducerSettings/ConsumerSettings.

val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

or you can pass the ActorSystem as explained above.


Solution

  • Thank you for noticing and filing an issue in the Alpakka Kafka connector project. The documentation is now updated: https://doc.akka.io/docs/akka-stream-kafka/current/producer.html