I was going through the documentation for Consumer API for Kafka in Alpakka. I came across this piece of code. From my understanding, the offset is committed using msg.committableOffset(). Then why do we require the .toMat() and mapMaterializedValue(). Can't I just attach it to Sink.Ignore()?
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);
You can't attach to Sink.ignore because you already attach the Commiter.Sink. But you can discard the materialized values.
The example is using toMat with Keep.both to retain both materialized values, the Control from the Source and the Future[Done] from the Sink. With both values it creates a DrainingControl in mapMaterializedValue that allows you to stop the stream or drain the stream before stop or be notified when the stream stops.
If you don't care about this Control (although you should) you can use
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.to(Committer.sink(committerSettings.withMaxBatch(1)))
.run(materializer);