Search code examples
scalastreamiterationstdin

Scala: Ending Stream.iterate


After some trial and error I found a way of ending Stream.iterate (if standard input ends in my case). But to me it seems to be more of an evil hack than a best practice solution.

Before (not ending if standard input ends because Stream.iterate is running infinitely):

val initialDocument = Document()
val in: Stream[Document] = Stream.iterate(Stream[Document]()) { documents =>
  val lastDocument: Document = documents.lastOption.getOrElse(initialDocument)
  val line: String = io.StdIn.readLine
  if(line != null) {
    line
      .split(";")
      .map(_.trim)
      .scanLeft(lastDocument)((document: Document, line: String) => document.processInput(line))
      .drop(1) // drop the seed
      .toStream
  } else {
    Stream.empty
  }
}.flatten
for(document <- in) {
  // do something with the document snapshot
}

After (now working as expected):

val initialDocument = Document()
val in: Stream[Document] = Stream.iterate(Stream[Option[Document]]()) { documents =>
  val lastDocument: Option[Document] = Some(documents.lastOption.flatten.getOrElse(initialDocument))
  val line: String = io.StdIn.readLine
  if(line != null) {
    line
      .split(";")
      .map(_.trim)
      .scanLeft(lastDocument)((document: Option[Document], line: String) => document.map(_.processInput(line)))
      .drop(1) // drop the seed
      .toStream
  } else {
    Stream(None) // "None" is used by "takeWhile" to see we have no more input
  }
}.flatten.takeWhile(_.isDefined).map(_.get)
for(document <- in) {
  // do something with the document snapshot
}

As you can see there are several new Option type values introduced. Their sole purpose is telling takeWhile if the end is reached.

How could I write this functionality in a more elegant form?


Solution

  • If I understand what you are doing correctly, this will solve your problem in a simpler way:

    val in = Iterator
      .continually(io.StdIn.readLine())       // Read all lines from StdIn infinitely
      .takeWhile(_ != null)                   // Stop on EOI
      .flatMap(_.split(';'))                  // Iterator of sublines
      .map(_.trim)                            // Iterator of trimmed sublines
      .scanLeft(Document())(_ processInput _) // Iterator of a Document snapshot per subline
      .drop(1)                                // Drop the empty Document
    
    for (document -> in) {
      // do something with the document snapshot
    }
    

    Basically, first create a lazy Iterator of trimmed line parts from the whole input, and then make document snapshots based on this iterator.

    It's best to avoid using Stream, unless you really need its memoization feature. Stream is slow, and the memoization makes it easy to cause memory leaks. Iterator has all the same nice methods to create finite or infinite lazy sequences, and should be the preferred collection for that purpose.