Search code examples
akkaakka-stream

Akka: is there a Sink that never pulls?


Need a Sink that never pulls, to use in unit tests. Is there one already available or do I need to code it myself?

Please note that Sink.ignore() won't help, because it ALWAYS pulls. I need a Sink that NEVER pulls.


Solution

  • Direct Answer

    You can create an org.reactivestreams.Subscriber that never calls Subscription.request:

    import org.reactivestreams.Subscriber
    
    def nonSubscriber[T] : Subscriber[T] = new Subscriber[T] {
      override def onComplete() : Unit = {}
    
      override def onError(throwable: java.lang.Throwable) : Unit = {}
    
      //should never be called therefore definition is not implemented
      override def onNext(t: T) : Unit = ???
    
      //does not call s.request
      override def onSubscribe(s: Subscription) : Unit = {}
    } 
    

    This Subscriber can then be used to instantiate a Sink:

    import akka.NotUsed
    import akka.stream.scaladsl.Sink
    
    def nonSubscribingSink[T] : Sink[T, NotUsed] = 
      Sink.fromSubscriber[T](nonSubscriber[T])
    

    Indirect Answer

    The nature of the question suggest that you are mixing your "business logic" with your akka stream logic. You may want to consider a re-design that could make the answer to your question unnecessary.