I'm trying some variation of connecting a producer to a consumer with the special case that some times I'd need to produce 1 extra message per message (e.g. 1 to the output topic and 1 message to a different topic) while keeping guarantees on that.
I was thinking of doing mapConcat and outputing multiple ProducerRecord objects, I'm concerned about loose guarantees in the edge case where the first message is enough for the commit to happen on that offset thus causing a potential loss of the second. Also it seems you can't just do .flatmap as you'd be going into the graph API which gets even more muddy as then it becomes harder to make sure once you merge into a commit flow you don't just ignore the duplicated offset.
Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
.map(msg => (msg, addLineage(msg.record.value())))
.mapConcat(input =>
if (math.random > 0.25)
List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
))
else List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
),ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
input._1.committableOffset
))
)
.via(Producer.flow(producerSettings))
.map(_.message.passThrough)
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
(batch, elem) => batch.updated(elem)
}
.mapAsync(parallelism = 3)(_.commitScaladsl())
.runWith(Sink.ignore)
The original 1 to 1 documentation is here: https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer
Has anyone thought of / solved this problem?
The Alpakka Kafka connector has recently introduced the flexiFlow
which supports your use-case: Let one stream element produce multiple messages to Kafka