Search code examples
scalaakka-stream

Retain materialization type when combining Sources


I want to combine 2 akka streams sources and retain ActorRef of the first source for the actual usage after materialization

val buffer = 100

val apiSource: Source[Data, ActorRef] = Source.actorRef[Data](buffer, OverflowStrategy.backpressure)
  .delay(2.second, DelayOverflowStrategy.backpressure)

val kafkaSource:   Source[Data, Consumer.Control] = createConsumer(config.kafkaConsumerConfig, "test")

val combinedSource: Source[Data, NotUsed] = Source.combine(kafkaSource, apiSource)(Merge(_))

The problem is that combined method ignores materialization types and I wonder if there is another way of achieving this


Solution

  • You can use Source#mergeMat:

    val combinedSource: Source[Data, ActorRef] = kafkaSource.mergeMat(apiSource)(Keep.right)