Search code examples
scalaakka-streamalpakka

Change a materialized value in a source using the contents of the stream


Alpakka provides a great way to access dozens of different data sources. File oriented sources such as HDFS and FTP sources are delivered as Source[ByteString, Future[IOResult]. However, HTTP requests via Akka HTTP are delivered as entity streams of Source[ByteString, NotUsed]. In my use case, I would like to retrieve content from HTTP sources as Source[ByteString, Future[IOResult] so I can build a unified resource fetcher that works from multiple schemes (hdfs, file, ftp and S3 in this case).

In particular, I would like to convert the Source[ByteString, NotUsed] source to Source[ByteString, Future[IOResult] where I am able to calculate the IOResult from the incoming byte stream. There are plenty of methods like flatMapConcat and viaMat but none seem to be able to extract details from the input stream (such as number of bytes read) or initialise the IOResult structure properly. Ideally, I am looking for a method with the following signature that will update the IOResult as the stream comes in.

  def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
    src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
  }

Solution

  • i can't recall any existing functionality, which can out of the box do this, but you can use alsoToMat (surprisingly didn't find it in akka streams docs, although you can look it in source code documentation & java api) flow function together with Sink.fold to accumulate some value and give it in the very end. eg:

    def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
        source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
    

    the thing is that alsoToMat combines input mat value with the one provided in alsoToMat. at the same time the values produced by source are not affected by the sink in alsoToMat:

    def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
      viaMat(alsoToGraph(that))(matF)
    

    it's not that hard to adapt this function to return IOResult, which is according to the source code:

    final case class IOResult(count: Long, status: Try[Done]) { ... }
    

    one more last thing which you need to pay attention - you want your source be like:

    Source[ByteString, Future[IOResult]]
    

    but if you wan't to carry these mat value till the very end of stream definition, and then do smth based on this future completion, that might be error prone approach. eg, in this example i finish the work based on that future, so the last value will not be processed:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Keep, Sink, Source}
    
    import scala.concurrent.duration._
    import scala.concurrent.{Await, ExecutionContext, Future}
    
    object App extends App {
    
      private implicit val sys: ActorSystem = ActorSystem()
      private implicit val mat: ActorMaterializer = ActorMaterializer()
      private implicit val ec: ExecutionContext = sys.dispatcher
    
      val source: Source[Int, Any] = Source((1 to 5).toList)
    
      def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
        source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
    
      val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
      f.onComplete(t => println(s"f1 completed - $t"))
      Await.ready(f, 5.minutes)
    
    
      mat.shutdown()
      sys.terminate()
    }