Search code examples
scalascalazscalaz-stream

How do you write to and read from an external process using scalaz streams


I would like to be able to send data from a scalaz stream into an external program and then get the result of that item back in about 100ms in the future. Although I was able to do this with the code below by zipping the output stream Sink with the input stream Process and then throwing away the Sink side effect, I feel like this solution may be very brittle.

If the external program has an error for one of the input items everything will be out of sync. I feel like the best bet would be to send some sort of incremental ID into the external program which it can echo back out in the future so that if an error occurs we can resync.

The main trouble I am having is joining together the result of sending data into the external program Process[Task, Unit] with the output of the program Process[Task, String]. I feel like I should be using something from wyn but not really sure.

import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(1000)
     System.currentTimeMillis().toString
  })

  val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
  val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))

  val in: Process[Task, Unit] = source to printLines

  val zip: Process[Task, (Unit, String)] = in.zip(linesR)
  val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
  out.run.run
}

Solution

  • After delving a little deeper into the more advanced types. It looks like Exchange does exactly what I want.

    import java.io.PrintStream
    
    import scalaz._
    import scalaz.concurrent.Task
    import scalaz.stream._
    import scalaz.stream.io._
    
    object Main extends App {
    /*
      # echo.sh just prints to stdout what it gets on stdin
      while read line; do
        sleep 0.1
        echo $line
      done
    */
      val program: java.lang.Process = Runtime.getRuntime.exec("./echo.sh")
    
      val source: Process[Task, String] = Process.repeatEval(Task{
         Thread.sleep(100)
         System.currentTimeMillis().toString
      })
    
      val read: stream.Process[Task, String] = linesR(program.getInputStream)
      val write: Sink[Task, String] = printLines(new PrintStream(program.getOutputStream))
      val exchange: Exchange[String, String] = Exchange(read, write)
      println(exchange.run(source).take(10).runLog.run)
    }