I'am trying to read multiple files with akka streams and put result in a list. I can read one file with no problem. the return type is Future[Seq[String]]. problem is processing the sequence inside the Future must go inside an onComplete{}.
i'am trying the following code but abviously it will not work. the list acc outside of the onComplete is empty. but holds values inside the inComplete. I understand the problem but i don't know how to approach this.
// works fine
def readStream(path: String, date: String): Future[Seq[String]] = {
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
val result: Future[Seq[String]] =
FileIO.fromPath(Paths.get(path + "transactions_" + date +
".data"))
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
.toMat(Sink.seq)(Keep.right)
.run()
var aa: List[scala.Array[String]] = Nil
result.onComplete(x => {
aa = x.get.map(line => line.split('|')).toList
})
result
}
//this won't work
def concatFiles(path : String, date : String, numberOfDays : Int) :
List[scala.Array[String]] = {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
val formattedDate = LocalDate.parse(date, formatter);
var acc = List[scala.Array[String]]()
for( a <- 0 to numberOfDays){
val date = formattedDate.minusDays(a).toString().replace("-", "")
val transactions = readStream(path , date)
var result: List[scala.Array[String]] = Nil
transactions.onComplete(x => {
result = x.get.map(line => line.split('|')).toList
acc= acc ++ result })
}
acc}
General Solution
Given an Iterator of Paths
values a Source
of the file lines can be created by combining FileIO
& flatMapConcat
:
val lineSourceFromPaths : (() => Iterator[Path]) => Source[String, _] = pathsIterator =>
Source
.fromIterator(pathsIterator)
.flatMapConcat { path =>
FileIO
.fromPath(path)
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
}
Application to Question
The reason your List
is empty is because the Future
values have not completed and therefore your mutable list is not be updated before the function returns the list.
Critique of Code in Question
The organization and style of the code within the question suggest several misunderstandings related to akka
& Future
. I think you are attempting a rather complex workflow without understanding the fundamentals of the tools you are trying to use.
1.You should not create an ActorSystem
each time a function is being called. There is usually 1 ActorSystem per application and it's created only once.
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
def readStream(...
2.You should try to avoid mutable collections and instead use Iterator
with corresponding functionality:
def concatFiles(path : String, date : String, numberOfDays : Int) : List[scala.Array[String]] = {
val formattedDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyyMMdd"))
val pathsIterator : () => Iterator[Path] = () =>
Iterator
.range(0, numberOfDays+1)
.map(formattedDate.minusDays)
.map(_.String().replace("-", "")
.map(path => Paths.get(path + "transactions_" + date + ".data")
lineSourceFromPaths(pathsIterator)
3.Since you are dealing with Futures you should not wait for Futures to complete and should instead change the return type of concateFiles
to Future[List[Array[String]]]
.