I looked at a lot of examples and posts about this. I got it working in one way but I haven't quite gotten the idea yet, I'm still getting tripped up by Future[IOResult]
when I'm trying to read a file into a stream of record objects, one per line, call it Future[List[LineRecordCaseClass]]
is what I want instead.
val source = FileIO.fromPath(Paths.get("/tmp/junk_data.csv"))
val flow = makeFlow() // Framing.delimiter->split(",")->map to LineRecordCaseClass
val sink = Sink.collection[LineRecordCaseClass, List[LineRecordCaseClass]]
val graph = source.via(flow).to(sink)
val typeMismatchError: Future[List[LineRecordCaseClass]] = graph.run()
Why does graph.run()
return a Future[IOResult]
instead? Perhaps I'm missing a Keep.left
somewhere, or something? If so what and where at?
Some concept I'm missing.
Here are the type of yours val
s
val source: Source[ByteString, Future[IOResult]] =
val flow: Flow[ByteString, LineRecordCaseClass, NotUsed] =
val sink: Sink[LineRecordCaseClass, Future[List[LineRecordCaseClass]]] =
From the akka-stream doc , in the code snippet
By default, the materialized value of the leftmost stage is preserved
The materialized value at your leftmost stage (the source) is Future[IOResult]
.
In source.via(flow).to(sink)
, if you look at the implementation of .to
, it calls .toMat
with a default Keep.left
The type for Keep.both
is
val check: RunnableGraph[(Future[IOResult], Future[List[LineRecordCaseClass]])] = source.via(flow).toMat(sink)(Keep.both)
So if you want Future[List[LineRecordCaseClass]]
, you can do
source.via(flow).toMat(sink)(Keep.right)
I recommend this video which explains the materialized value