I am trying to use the consumer library https://doc.akka.io/docs/alpakka-kafka/current/consumer.html the method committableSource
as the following:
Consumer
.committableSource(consumerSettings, Subscriptions.topics("SAP-EVENT-BUS"))
.map(_.committableOffset)
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
The problem here is, how to get the messages, that the consumer receives from Kafka
?
With the following code snippet works:
Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.to(Sink.foreach(println))
.run()
The whole code snippet:
private implicit val materializer = ActorMaterializer()
private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("SAP-SENDER-GROUP")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
private val committerSettings = CommitterSettings(context.system)
Consumer
.committableSource(consumerSettings, Subscriptions.topics("TOPIC"))
.map(_.committableOffset)
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.to(Sink.foreach(println))
.run()
Or do I have to use both, one for commit and another for consuming.
Instead of Committer.sink
, which terminates the stream, use Committer.flow
which allows you to continue the stream until you choose to terminate it with a different sink.