Search code examples
asynchronousocamllwt

How to combine Lwt filters?


I am currently learning Lwt. I am interested into using asynchronous processes to replace some shell routines by OCaml routines.

Let us take a look at a simplified first attempt, where a filter is created by combining two threads running cat:

let filter_cat ()=
  Lwt_process.pmap_lines ("cat", [| "cat" |])

let filter_t () =
  Lwt_io.stdin
  |> Lwt_io.read_lines
  |> filter_cat ()
  |> filter_cat ()
  |> Lwt_io.write_lines Lwt_io.stdout

let () =
  filter_t ()
  |> Lwt_main.run

This filter somehow works but hangs up when its standard input closes instead of exiting. If I remove one of the filter_cat, it works as expected.

I am guessing that I do not compose these filters appropriately and therefore cannot join the two threads I am starting. What is the correct way to compose these filters, so that the program terminates after it reads EOF on stdin?


You can find this program together with a BSD Owl Makefile in a Github gist.


Solution

  • The answer to this, is that there is a little bug in Lwt. There is an internal function, monitor that which performs the piping:

    (* Monitor the thread [sender] in the stream [st] so write errors are
       reported. *)
    let monitor sender st =
      let sender = sender >|= fun () -> None in
      let state = ref Init in
      Lwt_stream.from
        (fun () ->
           match !state with
             | Init ->
                 let getter = Lwt.apply Lwt_stream.get st in
                 let result _ =
                   match Lwt.state sender with
                     | Lwt.Sleep ->
                         (* The sender is still sleeping, behave as the
                            getter. *)
                         getter
                     | Lwt.Return _ ->
                         (* The sender terminated successfully, we are
                            done monitoring it. *)
                         state := Done;
                         getter
                     | Lwt.Fail _ ->
                         (* The sender failed, behave as the sender for
                            this element and save current getter. *)
                         state := Save getter;
                         sender
                 in
                 Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
             | Save t ->
                 state := Done;
                 t
             | Done ->
                 Lwt_stream.get st)
    

    The problem is in the definition

    let getter = Lwt.apply Lwt_stream.get st
    

    When the getter process meets the end of the stream, then it is saved, but the sender is lost, which seems to prevent completion. This can be fixed by improving the definition of getter by telling it to behave as the sender when the end of the stream has been reached.