Search code examples
akka-stream

How can I create a sink from a list of operations


I want to create a sink in akka streams which is made up of many operations. e.g map, filter, fold and then sink. The best I can do at the moment is the following. I don't like it because I have to specify broadcast even though I am only letting a single value through. Does anyone know a better way of doing this?

def kafkaSink(): Sink[PartialBatchProcessedResult, NotUsed] = {
    Sink.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val broadcast = b.add(Broadcast[PartialBatchProcessedResult](1))
    broadcast.out(0)
    .fold(new BatchPublishingResponseCollator()) { (c, e) => c.consume(e) }
    .map(_.build())
    .map(a =>
      FunctionalTesterResults(sampleProjectorConfig, 0, a)) ~> Sink.foreach(new KafkaTestResultsReporter().report)
  SinkShape(broadcast.in)
})

}


Solution

  • One key point to remember with akka-stream is that any number of Flow values plus a Sink value is still a Sink.

    A couple of examples demonstrating this property:

    val intSink : Sink[Int, _] = Sink.head[Int]
    
    val anotherSink : Sink[Int, _] = 
      Flow[Int].filter(_ > 0)
               .to(intSink)
    
    val oneMoreSink : Sink[Int, _] = 
      Flow[Int].filter(_ > 0)
               .map(_ + 4)
               .to(intSink)
    

    Therefore, you can implement the map and filter as Flows. The fold that you are asking about can be implemented with Sink.fold.