Search code examples
scalaakkaakka-stream

RestartSource masking the materialized value for the wrapped source?


I am modifying an existing stream graph by adding some retry logic around various functionality. One of those pieces is the source, which in this case happens to be a kafka Consumer.committableSource from the alpakka kafka connector. Downstream, the graph is expecting a type of Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control], but when I wrap the committable source in a RestartSource I end up with Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed]

I tried adding (Keep.both) on the end, but ended up with a compile time error. Here are the two examples for reference:

val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], NotUsed] = RestartSource.onFailuresWithBackoff(
  minBackoff = 3.seconds,
  maxBackoff = 60.seconds,
  randomFactor = .2
) {() => Consumer.committableSource(consumerSettings, subscription)}

val s: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = Consumer.committableSource(consumerSettings, subscription)

Solution

  • As you have observed, and as discussed in this currently open ticket, the materialized value of the original Source is not exposed in the return value of the wrapping RestartSource. To get around this, try using mapMaterializedValue (disclaimer: I didn't test the following):

    val restartSource: Source[ConsumerMessage.CommittableMessage[String, AnyRef], Control] = {
      var control: Option[Control] = None
    
      RestartSource.onFailuresWithBackoff(
        minBackoff = 3.seconds,
        maxBackoff = 60.seconds,
        randomFactor = .2
      ) { () =>
        Consumer
          .committableSource(consumerSettings, subscription)
          .mapMaterializedValue { c =>
            control = Some(c)
          }
      }
      .mapMaterializedValue(_ => control)
      .collect { case Some(c) => c }
    }