I have the following snippet that reads a CSV file and just prints something to the console:
def readUsingAkkaStreams = {
import java.io.File
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import java.security.MessageDigest
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
val file = new File("/path/to/csv/file.csv")
val fileSource = FileIO.fromFile(file, 65536)
val flow = fileSource.map(chunk => chunk.utf8String)
flow.to(Sink.foreach(println(_))).run
}
I now have some questions around this:
The chunksize is the size in bytes. How is it handled internally? I mean would I end up in a situation that a chunk may contain only partial elements in a line?
How does this stream termintate? Right now it does not! I want it to know that it has read the file completely and should trigger a stop signal! Is there a mechanism to do this?
EDIT 1: After suggestions from the post below, I get an error message as shown in the screenshot!
EDIT 2:
Managed to get rid of the error by setting the maximumFrameLength to match the size of the maximum chunk size which is 65536.
val file = new File("/path/to/csf/file.csv")
val chunkSize = 65536
val fileSource = FileIO.fromFile(file, chunkSize).via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = chunkSize,
allowTruncation = true))
1.As per the docs:
Emitted elements are chunkSize sized ByteString elements, except the final element, which will be up to chunkSize in size.
The FileIO
source treats new lines as any other character. So yes, you will be potentially seeing the first part of a CSV line in a chunk, and the second part in another chunk. If this is not what you want, you can restructure how your ByteString
flow is chunked by using Framing.delimiter
(see the docs for more info).
As a side note, FileIO.fromFile
has been deprecated, better use FileIO.fromPath
.
An example would be:
val fileSource = FileIO.fromPath(...)
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
2.the sink materializes to a Future
you can map onto to do something when the stream terminates:
val result: Future[IOResult] = flow.runWith(Sink.foreach(println(_)))
result.onComplete(...)