I want to combine 2 akka streams sources and retain ActorRef
of the first source for the actual usage after materialization
val buffer = 100
val apiSource: Source[Data, ActorRef] = Source.actorRef[Data](buffer, OverflowStrategy.backpressure)
.delay(2.second, DelayOverflowStrategy.backpressure)
val kafkaSource: Source[Data, Consumer.Control] = createConsumer(config.kafkaConsumerConfig, "test")
val combinedSource: Source[Data, NotUsed] = Source.combine(kafkaSource, apiSource)(Merge(_))
The problem is that combined
method ignores materialization types and I wonder if there is another way of achieving this
You can use Source#mergeMat
:
val combinedSource: Source[Data, ActorRef] = kafkaSource.mergeMat(apiSource)(Keep.right)