Search code examples
scalaakka-stream

Akka Streams File Handling and Termination


I have the following snippet that reads a CSV file and just prints something to the console:

def readUsingAkkaStreams = {

    import java.io.File
    import akka.stream.scaladsl._
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import java.security.MessageDigest


    implicit val system = ActorSystem("Sys")
    implicit val materializer = ActorMaterializer()

    val file = new File("/path/to/csv/file.csv")

    val fileSource = FileIO.fromFile(file, 65536)

    val flow = fileSource.map(chunk => chunk.utf8String)

    flow.to(Sink.foreach(println(_))).run
  }

I now have some questions around this:

  1. The chunksize is the size in bytes. How is it handled internally? I mean would I end up in a situation that a chunk may contain only partial elements in a line?

  2. How does this stream termintate? Right now it does not! I want it to know that it has read the file completely and should trigger a stop signal! Is there a mechanism to do this?

EDIT 1: After suggestions from the post below, I get an error message as shown in the screenshot!

enter image description here

EDIT 2:

Managed to get rid of the error by setting the maximumFrameLength to match the size of the maximum chunk size which is 65536.

val file = new File("/path/to/csf/file.csv")
val chunkSize = 65536
val fileSource = FileIO.fromFile(file, chunkSize).via(Framing.delimiter(
  ByteString("\n"),
  maximumFrameLength = chunkSize,
  allowTruncation = true))

Solution

  • 1.As per the docs:

    Emitted elements are chunkSize sized ByteString elements, except the final element, which will be up to chunkSize in size.

    The FileIO source treats new lines as any other character. So yes, you will be potentially seeing the first part of a CSV line in a chunk, and the second part in another chunk. If this is not what you want, you can restructure how your ByteString flow is chunked by using Framing.delimiter (see the docs for more info).

    As a side note, FileIO.fromFile has been deprecated, better use FileIO.fromPath.

    An example would be:

    val fileSource = FileIO.fromPath(...)
      .via(Framing.delimiter(
        ByteString("\n"),
        maximumFrameLength = 256,
        allowTruncation = true))
    

    2.the sink materializes to a Future you can map onto to do something when the stream terminates:

    val result: Future[IOResult] = flow.runWith(Sink.foreach(println(_)))
    
    result.onComplete(...)