I'm in OCaml.
I'm looking to simulate communicating nodes to look at how quickly messages propagate under different communication schemes etc.
The nodes can 1. send and 2. receive a fixed message. I guess the obvious thing to do is have each node as a separate thread.
Apparently you can get threads to pass messages to each other using the Event module and channels, but I can't find any examples of this. Can someone point me in the right direction or just give me a simple relevant example?
Thanks a lot.
If you are going to attempt a simulation, then you will need a lot more control over your nodes than simply using threads will allow — or at least, without major pains.
My subjective approach to the topic would be to create a simple, single-threaded virtual machine in order to keep full control over the simulation. The easiest way to do so in OCaml is to use a monad-like structure (as is done in Lwt, for instance) :
(* A thread is a piece of code that can be executed to perform some
side-effects and fork zero, one or more threads before returning.
Some threads may block when waiting for an event to happen. *)
type thread = < run : thread list ; block : bool >
(* References can be used as communication channels out-of-the box (simply
read and write values ot them). To implement a blocking communication
pattern, use these two primitives: *)
let write r x next = object (self)
method block = !r <> None
method run = if self # block then [self]
else r := Some x ; [next ()]
end
let read r next = object (self)
method block = !r = None
method run = match r with
| None -> [self]
| Some x -> r := None ; [next x]
end
You can create better primitives that suit your needs, such as adding a "time required for transmitting" property in your channels.
The next step is defining a simulation engine.
(* The simulation engine can be implemented as a simple queue. It starts
with a pre-defined set of threads and returns when no threads are left,
or when all threads are blocking. *)
let simulate threads =
let q = Queue.create () in
let () = List.iter (fun t -> Queue.push t q) threads in
let rec loop blocking =
if Queue.is_empty q then `AllThreadsTerminated else
if Queue.length q = blocking then `AllThreadsBlocked else
let thread = Queue.pop q in
if thread # block then (
Queue.push thread q ;
loop (blocking + 1)
) else (
List.iter (fun t -> Queue.push t q) (thread # run) ;
loop 0
)
in
loop 0
Again, you may adjust the engine to keep track of what node is executing which thread, to keep per-node priorities in order to simulate one node being massively slower or faster than others, or randomly picking a thread for execution on every step, and so on.
The last step is executing a simulation. Here, I'm going to have two threads sending random numbers back and forth.
let rec thread name input output =
write output (Random.int 1024) (fun () ->
read input (fun value ->
Printf.printf "%s : %d" name value ;
print_newline () ;
thread name input output
))
let a = ref None and b = ref None
let _ = simulate [ thread "A -> B" a b ; thread "B -> A" b a ]