Search code examples
scalaakkaakka-streamalpakka

Akka: How to extract a value in one graph stage and use it in the next


I am using Alpakka and Akka to process a CSV file. Since I have a bunch of CSV files that have to be added to the same stream, I would like to add a field that contains information from the file name or request. Currently I have something like this:

val source = FileIO.fromPath(Paths.get("10002070.csv"))
  .via(CsvParsing.lineScanner())

Which streams a Sequence of Lists (lines) of ByteStrings (fields). The goal would be something like:

val filename = "10002070.csv"
val source = FileIO.fromPath(Path.get(filename))
    .via(CsvParsing.lineScanner())
    .via(AddCSVFieldHere(filename))

Creating a structure similar to:

10002070.csv,max,estimated,12,1,0

Where the filename is a field non-existent in the original source.

I thing it does not look very pretty to inject values mid-stream, plus eventually I would like to determine the filenames passed to the parsing in a stream stage that reads a directory.

What is the correct/canonical way to pass values through stream stages for later re-use?


Solution

  • You could transform the stream with map to add the file name to each List[ByteString]:

    val fileName = "10002070.csv"
    val source =
      FileIO.fromPath(Path.get(fileName))
        .via(CsvParsing.lineScanner())
        .map(List(ByteString(fileName)) ++ _)
    

    For example:

    Source.single(ByteString("""header1,header2,header3
                               |1,2,3
                               |4,5,6""".stripMargin))
      .via(CsvParsing.lineScanner())
      .map(List(ByteString("myfile.csv")) ++ _)
      .runForeach(row => println(row.map(_.utf8String)))
    
    // The above code prints the following:
    // List(myfile.csv, header1, header2, header3)
    // List(myfile.csv, 1, 2, 3)
    // List(myfile.csv, 4, 5, 6)
    

    The same approach is applicable in the more general case in which you don't know the file names upfront. If you want to read all the files in a directory (assuming that all of these files are csv files), concatenate the files into a single stream, and preserve the file name in each stream element, then you could do so with Alpakka's Directory utility in the following manner:

    val source =
      Directory.ls(Paths.get("/my/dir")) // Source[Path, NotUsed]
        .flatMapConcat { path =>
           FileIO.fromPath(path)
             .via(CsvParsing.lineScanner())
             .map(List(ByteString(path.getFileName.toString)) ++ _)
        }