I'm trying to create a simple prototype using Alpakka Kafka connector (Akka Stream Kafka).
When running the application I get the following error:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
I have the following code in ./src/main/scala/App.scala
:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
The following build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
I run the app using sbt run
. I have not configured any uber/assembly jar.
I'm probably missing something obvious but I cannot see it ... I suspect there is some problem with akka dependencies.
As suggested by @terminally-chill calling ProducerSettings(system, new StringSerializer, new StringSerializer)
(passing ActorSystem
instead of a configuration) solve the problem. I just don't understand if this is by design or a bug.
I have create a github issue that is already been fixed. Now documentation is more accurate and explain the correct way to create the ProducerSettings
/ConsumerSettings
.
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
or you can pass the ActorSystem
as explained above.
Thank you for noticing and filing an issue in the Alpakka Kafka connector project. The documentation is now updated: https://doc.akka.io/docs/akka-stream-kafka/current/producer.html