What could be the most elegant implementation of (cyclic) barrier for Elixir? An algorithm to be implemented (vertex coloring) has a loop with a waiting phase for the spawned processes ("execute ... synchronously in parallel", and then checking termination condition using all processes' results), it's Algorithm 5 "6-color" from Principles of Distributed Computing, Ch. 1.
Most references are for .NET, pthreads, and other thread-related computations, so I am not sure if barrier is the right pattern I am after. May be, there is more "Elixirish" way.
I do not have any code yet (in search for pattern), but here is the code, implementing "slow" version of the same problem: https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir
The idea I got is to have top-level process (the one, which spawn one process per graph node) to send and receive messages, which will synchronize node processes. It must be mentioned, that node processes also communicate with each other: parents send messages to children during one top-level loop iteration. The complication however is, that no process should continue after top-level received node's message and before all nodes did their iteration (most probably I will use tail recursion though). This is why I thought about barrier mechanism.
I'm not sure if this is exactly what you're looking for, but here is a cyclic barrier based on the java.util.concurrent.CyclicBarrier class in java and the Concurrent::CyclicBarrier class in ruby.
defmodule CyclicBarrier do
require Record
Record.defrecordp :barrier, CyclicBarrier,
pid: nil
def start(parties, action \\ nil)
when (is_integer(parties) and parties > 0)
and (action === nil or is_function(action, 0)),
do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))
def stop(barrier(pid: pid)) do
call(pid, :stop)
true
end
def alive?(barrier(pid: pid)) do
Process.alive?(pid)
end
def broken?(barrier(pid: pid)) do
case call(pid, :status) do
:waiting ->
false
_ ->
true
end
end
def number_waiting(barrier(pid: pid)) do
case call(pid, :number_waiting) do
n when is_integer(n) ->
n
_ ->
false
end
end
def parties(barrier(pid: pid)) do
case call(pid, :parties) do
n when is_integer(n) ->
n
_ ->
false
end
end
def reset(barrier(pid: pid)) do
case call(pid, :reset) do
:reset ->
true
:broken ->
true
_ ->
false
end
end
def wait(barrier = barrier()),
do: wait(nil, barrier)
def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
case call(pid, :wait, timeout) do
:fulfilled ->
true
:broken ->
false
:timeout ->
reset(barrier)
false
_ ->
false
end
end
defp call(pid, request, timeout \\ nil) do
case Process.alive?(pid) do
false ->
{:EXIT, pid, :normal}
true ->
trap_exit = Process.flag(:trap_exit, true)
Process.link(pid)
ref = make_ref()
send(pid, {ref, self(), request})
case timeout do
nil ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
end
_ ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
after
timeout ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
:timeout
end
end
end
end
defmodule Server do
require Record
Record.defrecordp :state_data,
waiting: 0,
parties: nil,
action: nil,
q: :queue.new()
def init(parties, action),
do: loop(:waiting, state_data(parties: parties, action: action))
defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
defp loop(state_name, sd) do
receive do
{ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
handle(state_name, request, {ref, pid}, sd)
end
end
defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:waiting, :reset, from, sd = state_data(q: q)),
do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
defp handle(:broken, :reset, from, sd = state_data(q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:broken, :wait, from, sd) do
cast(from, :broken)
loop(:broken, sd)
end
defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
cast(from, number_waiting)
loop(state_name, sd)
end
defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
cast(from, parties)
loop(state_name, sd)
end
defp handle(state_name, :status, from, sd) do
cast(from, state_name)
loop(state_name, sd)
end
defp handle(_state_name, :stop, _from, _sd) do
exit(:normal)
end
defp broadcast(q, message),
do: for from <- :queue.to_list(q),
do: cast(from, message)
defp cast({ref, pid}, message),
do: send(pid, {ref, message})
defp done(state, action, q, continue \\ true) do
run(action)
broadcast(q, state)
case continue do
true ->
:waiting
false ->
state
end
end
defp run(nil),
do: nil
defp run(action),
do: action.()
end
end
Here's an example using CyclicBarrier
in an IEx shell for Elixir:
iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]
The exact order of process execution is non-deterministic.
Other examples of the functions on CyclicBarrier
are below:
iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false