Search code examples
scalaakkaakka-stream

How to switch between multiple Sources?


Suppose I have two infinite sources of the same type witch could be connected to the one Graph. I want to switch between them from outside already materialized graph, might be the same way as it possible to shutdown one of them with KillSwitch.

val source1: Source[ByteString, NotUsed] = ???
val source2: Source[ByteString, NotUsed] = ???

val (switcher: Switcher, source: Source[ByteString, NotUsed]) = 
    Source.combine(source1,source2).withSwitcher.run()

switcher.switch()

By default I want to use source1 and after switch I want to consume data from source2

source1 
        \
          switcher ~> source    

source2

Is it possible to implement this logic with Akka Streams?


Solution

  • Ok, after some time I found the solution.

    So here I can use the same principle as we have in VLAN. I just need to tag my sources and then pass them through MergeHub. After that it's easy to filter those sources by tag and produce right result as Source.

    All that I need to switch from one to another Source is a change of filter condition.

    source1.map(s => (tag1, s))
                               \
                                 MergeHub.filter(_._1 == tagX).map(_._2) -> Source
                               /
    source2.map(s => (tag2, s))
    

    Here is some example:

    object SomeSource {
    
      private var current = "tag1"
    
      val source1: Source[ByteString, NotUsed] = ???
      val source2: Source[ByteString, NotUsed] = ???
    
      def switch = {
         current = if (current == "tag1") "tag2" else "tag1"
      }
    
      val (sink: Sink[(String, ByteString), NotUsed], 
           source: Source[ByteString, NotUsed]) =
        MergeHub.source[(String, ByteString)]
          .filter(_._1 == current)
          .via(Flow[(String, ByteString)].map(_._2))
          .toMat(BroadcastHub.sink[ByteString])(Keep.both).run()
    
      source1.map(s => ("tag1", s)).runWith(sink)
      source2.map(s => ("tag2", s)).runWith(sink)
    
    }
    
    SomeSource.source    // do something with Source
    
    SomeSource.switch()  // then switch