Search code examples
scalascalazscalaz-stream

How do I cleanly log to io.stdOutLines and respond to the client with a scalaz.stream.tcp server


I'm very new to both scalaz-stream and specifically scalaz.stream.tcp. I'm trying to do a very simple server for my own educational purposes. I parse the requests into commands, execute them to produce responses, and write the responses back to the client. The part I am having issues with is that I want to log each received command to stdout.

Here is my inner Process that I am passing to tcp.server:

def inner: Process[tcp.Connection, Unit] = {
    val requests: Process[Connection, String] = tcp.reads(1024) pipe text.utf8Decode
    val cmds: Process[Connection, Command] = requests.map(parseRequest)
    val header: Process[Task, ByteVector] = Process("HEADER\n").pipe(text.utf8Encode)
    val loggedCmds: Process[Connection, Command] = cmds.map { cmd =>
        println(cmd.toString)
        cmd
    }
    val results: Process[Connection, Process[Task, ByteVector]] = loggedCmds.map(_.execute)
    val processedRequests: Process[Connection, Unit] = results.flatMap(result =>  tcp.writes(tcp.lift(header ++ result)))
    processedRequests
}

(I am not in the habit of specifying the types everywhere; I just did that to try to get a handle on things. I plan to remove those.)

The above code actually compiles and runs correctly, but I do not feel it is very clean or idiomatic. Specifically I am unhappy with the loggedCmds part. I wanted to use io.stdOutLines, either through .observer or using writer.logged/mapW/drainW, but no matter what I tried I could not seem to get the types to line up correctly. I was always getting type conflicts between Task and Connection. tcp.lift seems to help with an input stream, but it does not seem to work for a Sink. Is there a cleaner/better way to do the loggedCmds part (FWIW: I'm open to corrections or improvements to any of the above code).

I should note that if I just have the results go to stdout via io.stdOutLines I do not have an issue ("through" seems to work in that case, which I have seen in examples), it's just when I want to send the stream to io.stdOutLines and also continue using the stream to respond to the client.


Solution

  • Figured it out on my own (finally). Using ".toChannel" I was able to do it:

    val cmdFormatter = process1.id[Command].map(_.toString)
    val cmdPrinter = io.stdOutLines.pipeIn(cmdFormatter)
    

    ...

    val cmds: Process[Connection, Command] = requests.map(parseRequest) through
        cmdPrinter.toChannel