Search code examples
akkaakka-stream

Akka Streams: Throttling a File Source


Ive got a file containing several thousand lines like this.

Mr|David|Smith|[email protected]
Mrs|Teri|Smith|[email protected]
...

I want to read the file emitting each line downstream but in a throttled manner ie. 1 per/sec.

I cannot quite figure out how to get the throttling working in the flow.

flow1 (below) outputs the first line after 1 sec and then terminates.
flow2 (below) waits 1 sec then outputs the whole file.

val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)

val flow1 = Flow[ByteString].
              via(Framing.delimiter(ByteString(System.lineSeparator),10000)).
              throttle(1, 1.second, 1, ThrottleMode.shaping).
              map(bs => bs.utf8String)

val flow2 = Flow[ByteString].
              throttle(1, 1.second, 1, ThrottleMode.shaping).
              via(Framing.delimiter(ByteString(System.lineSeparator), 10000)).
              map(bs => bs.utf8String)

val sink = Sink.foreach(println)
val res = source.via(flow2).to(sink).run().onComplete(_ => system.terminate())

I couldn't glean any solution from studying the docs.

Would greatly appreciate any pointers.


Solution

  • Use runWith, instead of to, with flow1:

    val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)
    
    val flow1 = 
      Flow[ByteString]
        .via(Framing.delimiter(ByteString(System.lineSeparator), 10000))
        .throttle(1, 1.second, 1, ThrottleMode.shaping)
        .map(bs => bs.utf8String)
    
    val sink = Sink.foreach(println)
    
    source.via(flow1).runWith(sink).onComplete(_ => system.terminate())
    

    to returns the materialized value of the Source (i.e., the source.via(flow1)), so you're terminating the actor system when the "left-hand side" of the stream is completed. What you want to do is to shut down the system when the materialized value of the Sink is completed. Using runWith returns the materialized value of the Sink parameter and is equivalent to:

    source
      .via(flow1)
      .toMat(sink)(Keep.right)
      .run()
      .onComplete(_ => system.terminate())