I am modifying an existing stream graph by adding some retry logic around various functionality. One of those pieces is the source, which in this case happens to be a kafka Consumer.committableSource
from the alpakka kafka connector. Downstream, the graph is expecting a type of Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control]
, but when I wrap the committable source in a RestartSource
I end up with Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed]
I tried adding (Keep.both)
on the end, but ended up with a compile time error. Here are the two examples for reference:
val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed] = RestartSource.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 60.seconds,
randomFactor = .2
) {() => Consumer.committableSource(consumerSettings, subscription)}
val s: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = Consumer.committableSource(consumerSettings, subscription)
As you have observed, and as discussed in this currently open ticket, the materialized value of the original Source
is not exposed in the return value of the wrapping RestartSource
. To get around this, try using mapMaterializedValue
(disclaimer: I didn't test the following):
val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = {
var control: Option[Control] = None
RestartSource.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 60.seconds,
randomFactor = .2
) { () =>
Consumer
.committableSource(consumerSettings, subscription)
.mapMaterializedValue { c =>
control = Some(c)
}
}
.mapMaterializedValue(_ => control)
.collect { case Some(c) => c }
}