Search code examples
eventsocamlchannelmessage-passing

OCaml event/channel tutorial?


I'm in OCaml.

I'm looking to simulate communicating nodes to look at how quickly messages propagate under different communication schemes etc.

The nodes can 1. send and 2. receive a fixed message. I guess the obvious thing to do is have each node as a separate thread.

Apparently you can get threads to pass messages to each other using the Event module and channels, but I can't find any examples of this. Can someone point me in the right direction or just give me a simple relevant example?

Thanks a lot.


Solution

  • If you are going to attempt a simulation, then you will need a lot more control over your nodes than simply using threads will allow — or at least, without major pains.

    My subjective approach to the topic would be to create a simple, single-threaded virtual machine in order to keep full control over the simulation. The easiest way to do so in OCaml is to use a monad-like structure (as is done in Lwt, for instance) :

    (* A thread is a piece of code that can be executed to perform some
       side-effects and fork zero, one or more threads before returning. 
       Some threads may block when waiting for an event to happen. *)
    type thread = < run : thread list ; block : bool >
    
    (* References can be used as communication channels out-of-the box (simply 
       read and write values ot them). To implement a blocking communication 
       pattern, use these two primitives: *)
    
    let write r x next = object (self) 
      method block = !r <> None
      method run   = if self # block then [self]
                     else r := Some x ; [next ()]
    end
    
    let read r next = object (self) 
      method block = !r = None
      method run   = match r with 
                      | None -> [self]
                      | Some x -> r := None ; [next x]
    end
    

    You can create better primitives that suit your needs, such as adding a "time required for transmitting" property in your channels.

    The next step is defining a simulation engine.

    (* The simulation engine can be implemented as a simple queue. It starts 
       with a pre-defined set of threads and returns when no threads are left, 
       or when all threads are blocking. *)
    let simulate threads = 
      let q = Queue.create () in 
      let () = List.iter (fun t -> Queue.push t q) threads in 
      let rec loop blocking = 
        if Queue.is_empty q then `AllThreadsTerminated else 
          if Queue.length q = blocking then `AllThreadsBlocked else 
            let thread = Queue.pop q in 
            if thread # block then ( 
              Queue.push thread q ; 
              loop (blocking + 1) 
            ) else ( 
              List.iter (fun t -> Queue.push t q) (thread # run) ; 
              loop 0
            ) 
      in
      loop 0 
    

    Again, you may adjust the engine to keep track of what node is executing which thread, to keep per-node priorities in order to simulate one node being massively slower or faster than others, or randomly picking a thread for execution on every step, and so on.

    The last step is executing a simulation. Here, I'm going to have two threads sending random numbers back and forth.

    let rec thread name input output = 
      write output (Random.int 1024) (fun () -> 
        read input (fun value ->
          Printf.printf "%s : %d" name value ; 
          print_newline () ;
          thread name input output
      ))
    
    let a = ref None and b = ref None 
    let _ = simulate [ thread "A -> B" a b ; thread "B -> A" b a ]