Search code examples
scalaakkaakka-stream

Reading multiple files with akka streams in scala


I'am trying to read multiple files with akka streams and put result in a list. I can read one file with no problem. the return type is Future[Seq[String]]. problem is processing the sequence inside the Future must go inside an onComplete{}.

i'am trying the following code but abviously it will not work. the list acc outside of the onComplete is empty. but holds values inside the inComplete. I understand the problem but i don't know how to approach this.

// works fine  
def readStream(path: String, date: String): Future[Seq[String]] = {
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

val result: Future[Seq[String]] =
  FileIO.fromPath(Paths.get(path + "transactions_" + date + 
".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .toMat(Sink.seq)(Keep.right)
    .run()
 var aa: List[scala.Array[String]] = Nil
 result.onComplete(x => {
  aa = x.get.map(line => line.split('|')).toList
})
 result
}

//this won't work  
def concatFiles(path : String, date : String, numberOfDays : Int) : 
List[scala.Array[String]] = {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
val formattedDate = LocalDate.parse(date, formatter);
var acc = List[scala.Array[String]]()

for( a <- 0 to numberOfDays){
  val date = formattedDate.minusDays(a).toString().replace("-", "")


  val transactions = readStream(path , date)
  var result: List[scala.Array[String]] = Nil
  transactions.onComplete(x => {
    result = x.get.map(line => line.split('|')).toList 
    acc=  acc ++ result })
}
acc}

Solution

  • General Solution

    Given an Iterator of Paths values a Source of the file lines can be created by combining FileIO & flatMapConcat:

    val lineSourceFromPaths : (() => Iterator[Path]) => Source[String, _] = pathsIterator =>
      Source
        .fromIterator(pathsIterator)
        .flatMapConcat { path =>
          FileIO
            .fromPath(path)
            .via(Framing.delimiter(ByteString("\n"), 256, true))
            .map(_.utf8String)
        }
    

    Application to Question

    The reason your List is empty is because the Future values have not completed and therefore your mutable list is not be updated before the function returns the list.

    Critique of Code in Question

    The organization and style of the code within the question suggest several misunderstandings related to akka & Future. I think you are attempting a rather complex workflow without understanding the fundamentals of the tools you are trying to use.

    1.You should not create an ActorSystem each time a function is being called. There is usually 1 ActorSystem per application and it's created only once.

    implicit val system = ActorSystem("Sys")
    val settings = ActorMaterializerSettings(system)
    implicit val materializer = ActorMaterializer(settings)
    
    def readStream(...
    

    2.You should try to avoid mutable collections and instead use Iterator with corresponding functionality:

    def concatFiles(path : String, date : String, numberOfDays : Int) : List[scala.Array[String]] = {
    
      val formattedDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyyMMdd"))
    
      val pathsIterator : () => Iterator[Path] = () => 
        Iterator
          .range(0, numberOfDays+1)
          .map(formattedDate.minusDays)
          .map(_.String().replace("-", "")
          .map(path => Paths.get(path + "transactions_" + date + ".data")
    
      lineSourceFromPaths(pathsIterator)
    

    3.Since you are dealing with Futures you should not wait for Futures to complete and should instead change the return type of concateFiles to Future[List[Array[String]]].