Search code examples
scalastreamprocessbuilder

Scala: Get Scala Stream+Process instance from ProcessBuilder


I have the following options:

Option #1: lineStream(log: ProcessLogger): Stream[String] (and 3 similar options)

Option #2: run(log: ProcessLogger): Process (and 4 similar options)

How can I get both, Stream[String] + Process?

If that is not possible, how may I early destroy the system process when I decide to use the first option (Stream[String] but no Process instance)?

Related API Docs Page (ProcessBuilder)


Solution

  • This is one of the things I don't like about the process API in Scala - you can usually get on or the either, not both. I don't think what you want is possible with the API. If you look at ProcessBuilderImpl.lineStream() implementation, the process reference is only stored locally and one doesn't have access to it.

    Here's a quick example how you can implement such functionality yourself inspired by how lineStream() is actually written under the hood (see ProcessBuilderImpl and BasicIO). I haven't had the time to polish it (like return something better than a tuple), but it should give you an idea.

      object StreamProcessLogger {
        private val nonzeroException = true // set it to whatever suits you
    
        def run(processBuilder: ProcessBuilder): (Process, Stream[String]) = {
          val logger = new StreamProcessLogger
          val process = processBuilder.run(logger)
          waitForExitInAnotherThread(process, logger)
          (process, logger.stream)
        }
    
        private def waitForExitInAnotherThread(process: Process, logger: StreamProcessLogger) = {
          val thread = new Thread() {
            override def run() = { logger.setExitCode(process.exitValue()) }
          }
          thread.start()
        }
    
      }
    
      private class StreamProcessLogger extends ProcessLogger {
        val queue = new LinkedBlockingQueue[Either[Int, String]]
    
        override def buffer[T](f: => T): T = f
        override def out(s: => String): Unit = queue.put(Right(s))
        override def err(s: => String): Unit = queue.put(Right(s))
    
        def stream = next()
        def setExitCode(exitCode: Int) = queue.put(Left(exitCode))
    
        private def next(): Stream[String] = queue.take match {
          case Left(0) => Stream.empty
          case Left(code) => if (StreamProcessLogger.nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
          case Right(s) => Stream.cons(s, next())
        }
      }
    

    And here is the usage:

    test("returns stream and process") {
      val (process, stream) = StreamProcessLogger.run(Process("ls"))
      stream.foreach(println)
      println(process.exitValue())
    }