Search code examples
scalaakka-stream

Akka FileIO.fromPath - How to deal with IOResult and get the data instead?


I looked at a lot of examples and posts about this. I got it working in one way but I haven't quite gotten the idea yet, I'm still getting tripped up by Future[IOResult] when I'm trying to read a file into a stream of record objects, one per line, call it Future[List[LineRecordCaseClass]] is what I want instead.

val source = FileIO.fromPath(Paths.get("/tmp/junk_data.csv"))
val flow = makeFlow() // Framing.delimiter->split(",")->map to LineRecordCaseClass
val sink = Sink.collection[LineRecordCaseClass, List[LineRecordCaseClass]]

val graph = source.via(flow).to(sink)

val typeMismatchError: Future[List[LineRecordCaseClass]] = graph.run()

Why does graph.run() return a Future[IOResult] instead? Perhaps I'm missing a Keep.left somewhere, or something? If so what and where at?

Some concept I'm missing.


Solution

  • Here are the type of yours vals

    val source: Source[ByteString, Future[IOResult]] = 
    val flow: Flow[ByteString, LineRecordCaseClass, NotUsed] = 
    val sink: Sink[LineRecordCaseClass, Future[List[LineRecordCaseClass]]] = 
    

    From the akka-stream doc , in the code snippet

    By default, the materialized value of the leftmost stage is preserved

    The materialized value at your leftmost stage (the source) is Future[IOResult].

    In source.via(flow).to(sink), if you look at the implementation of .to, it calls .toMat with a default Keep.left

    The type for Keep.both is

    val check: RunnableGraph[(Future[IOResult], Future[List[LineRecordCaseClass]])] = source.via(flow).toMat(sink)(Keep.both)
    

    So if you want Future[List[LineRecordCaseClass]], you can do

    source.via(flow).toMat(sink)(Keep.right)
    

    I recommend this video which explains the materialized value