Search code examples
garbage-collectionocamlreactive-programmingocaml-lwt

How to stop OCaml garbage collecting my reactive event handler?


I'm trying to use the OBus library with Lwt_react. This uses "functional reactive programming" for properties and signals.

The problem (as noted in the React documentation) is that OCaml may garbage collect your callback while you're still using it. There's a keep function, which keeps the handler forever, but I don't want that. I do want to free it eventually, just not while I still need it.

So, I thought I'd attach the handler to a switch:

let keep ~switch handler =
  Lwt_switch.add_hook (Some switch) (fun () ->
    ignore handler;
    Lwt.return ()
  )

But my event handler gets garbage-collected anyway (which makes sense, since the code to turn off the switch is called when the signal arrives, so it's only the signal handler keeping the switch alive in the first place).

Here's a simplified (stand-alone) version of my code:

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let setup () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let dont_gc_me = Lwt_react.E.map handler finished_event in
  ignore dont_gc_me;  (* What goes here? *)

  print_endline "Waiting for signal...";
  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  let finished = Lwt.protected (setup ()) in

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt_main.run finished;
  print_endline "Done";

Without the Gc.full_major line, this normally prints Done. With it, it just hangs at Waiting for signal....

Edit: I've split setup (the real code) from the test driver and added a Lwt.protected wrapper to avoid masking the problem by accident of Lwt's cancellations.


Solution

  • Here is a snippet taken from some project of mine, fixed to workaround this weak references issue (thx!). First part is to keep a global root pointing to your object. Second part is to delimit the liveness of a signal/event to the extent of a Lwt thread.

    Please note that the reactive entity is cloned and explicitly stopped, which may not exactly match your expectations.

    module Keep : sig 
      type t
      val this : 'a -> t
      val release : t -> unit
    end = struct
      type t = {mutable prev: t; mutable next: t; mutable keep: (unit -> unit)}
      let rec root = {next = root; prev = root; keep = ignore}
    
      let release item =
        item.next.prev <- item.prev;
        item.prev.next <- item.next;
        item.prev <- item;
        item.next <- item;
        (* In case user-code keep a reference to item *)
        item.keep <- ignore
    
      let attach keep =
        let item = {next = root.next; prev = root; keep} in
        root.next.prev <- item;
        root.next <- item;
        item
    
      let this a = attach (fun () -> ignore a)
    end
    
    module React_utils : sig
      val with_signal : 'a signal -> ('a signal -> 'b Lwt.t) -> 'b Lwt.t
      val with_event  : 'a event -> ('a event -> 'b Lwt.t) -> 'b Lwt.t
    end = struct
      let with_signal s f =
        let clone = S.map (fun x -> x) s in
        let kept = Keep.this clone in
        Lwt.finalize (fun () -> f clone)
                     (fun () -> S.stop clone; Keep.release kept; Lwt.return_unit)
      let with_event e f =
        let clone = E.map (fun x -> x) e in
        let kept = Keep.this clone in
        Lwt.finalize (fun () -> f clone)
                     (fun () -> E.stop clone; Keep.release kept; Lwt.return_unit)
    end
    

    Solving your example with this:

    let run () =
      let switch = Lwt_switch.create () in
    
      let finished, waker = Lwt.wait () in
      let handler () = Lwt.wakeup waker () in
      (* We use [Lwt.async] because are not interested in knowing when exactly the reference will be released *)
      Lwt.async (fun () ->
        (React_utils.with_event (Lwt_react.E.map handler finished_event)
          (fun _dont_gc_me -> finished)));
      print_endline "Waiting for signal...";
    
      Gc.full_major ();  (* Force GC, to demonstrate problem *)
      fire_finished ();  (* Simulate send *)
    
      Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)