Search code examples
elixirbarrier

Synchronization barrier in Elixir?


What could be the most elegant implementation of (cyclic) barrier for Elixir? An algorithm to be implemented (vertex coloring) has a loop with a waiting phase for the spawned processes ("execute ... synchronously in parallel", and then checking termination condition using all processes' results), it's Algorithm 5 "6-color" from Principles of Distributed Computing, Ch. 1.

Most references are for .NET, pthreads, and other thread-related computations, so I am not sure if barrier is the right pattern I am after. May be, there is more "Elixirish" way.

I do not have any code yet (in search for pattern), but here is the code, implementing "slow" version of the same problem: https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir

The idea I got is to have top-level process (the one, which spawn one process per graph node) to send and receive messages, which will synchronize node processes. It must be mentioned, that node processes also communicate with each other: parents send messages to children during one top-level loop iteration. The complication however is, that no process should continue after top-level received node's message and before all nodes did their iteration (most probably I will use tail recursion though). This is why I thought about barrier mechanism.


Solution

  • I'm not sure if this is exactly what you're looking for, but here is a cyclic barrier based on the java.util.concurrent.CyclicBarrier class in java and the Concurrent::CyclicBarrier class in ruby.

    defmodule CyclicBarrier do
    
      require Record
      Record.defrecordp :barrier, CyclicBarrier,
        pid: nil
    
      def start(parties, action \\ nil)
          when (is_integer(parties) and parties > 0)
          and (action === nil or is_function(action, 0)),
        do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))
    
      def stop(barrier(pid: pid)) do
        call(pid, :stop)
        true
      end
    
      def alive?(barrier(pid: pid)) do
        Process.alive?(pid)
      end
    
      def broken?(barrier(pid: pid)) do
        case call(pid, :status) do
          :waiting ->
            false
          _ ->
            true
        end
      end
    
      def number_waiting(barrier(pid: pid)) do
        case call(pid, :number_waiting) do
          n when is_integer(n) ->
            n
          _ ->
            false
        end
      end
    
      def parties(barrier(pid: pid)) do
        case call(pid, :parties) do
          n when is_integer(n) ->
            n
          _ ->
            false
        end
      end
    
      def reset(barrier(pid: pid)) do
        case call(pid, :reset) do
          :reset ->
            true
          :broken ->
            true
          _ ->
            false
        end
      end
    
      def wait(barrier = barrier()),
        do: wait(nil, barrier)
    
      def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
        case call(pid, :wait, timeout) do
          :fulfilled ->
            true
          :broken ->
            false
          :timeout ->
            reset(barrier)
            false
          _ ->
            false
        end
      end
    
      defp call(pid, request, timeout \\ nil) do
        case Process.alive?(pid) do
          false ->
            {:EXIT, pid, :normal}
          true ->
            trap_exit = Process.flag(:trap_exit, true)
            Process.link(pid)
            ref = make_ref()
            send(pid, {ref, self(), request})
            case timeout do
              nil ->
                receive do
                  {^ref, reply} ->
                    Process.unlink(pid)
                    Process.flag(:trap_exit, trap_exit)
                    reply
                  exited = {:EXIT, ^pid, _} ->
                    Process.flag(:trap_exit, trap_exit)
                    exited
                end
              _ ->
                receive do
                  {^ref, reply} ->
                    Process.unlink(pid)
                    Process.flag(:trap_exit, trap_exit)
                    reply
                  exited = {:EXIT, ^pid, _} ->
                    Process.flag(:trap_exit, trap_exit)
                    exited
                after
                  timeout ->
                    Process.unlink(pid)
                    Process.flag(:trap_exit, trap_exit)
                    :timeout
                end
            end
    
        end
      end
    
      defmodule Server do
    
        require Record
        Record.defrecordp :state_data,
          waiting: 0,
          parties: nil,
          action:  nil,
          q:       :queue.new()
    
        def init(parties, action),
          do: loop(:waiting, state_data(parties: parties, action: action))
    
        defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
          do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
        defp loop(state_name, sd) do
          receive do
            {ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
              handle(state_name, request, {ref, pid}, sd)
          end
        end
    
        defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
          do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
        defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
          do: loop(done(:reset, nil, :queue.in(from, q)), sd)
        defp handle(:waiting, :reset, from, sd = state_data(q: q)),
          do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
        defp handle(:broken, :reset, from, sd = state_data(q: q)),
          do: loop(done(:reset, nil, :queue.in(from, q)), sd)
        defp handle(:broken, :wait, from, sd) do
          cast(from, :broken)
          loop(:broken, sd)
        end
        defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
          cast(from, number_waiting)
          loop(state_name, sd)
        end
        defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
          cast(from, parties)
          loop(state_name, sd)
        end
        defp handle(state_name, :status, from, sd) do
          cast(from, state_name)
          loop(state_name, sd)
        end
        defp handle(_state_name, :stop, _from, _sd) do
          exit(:normal)
        end
    
        defp broadcast(q, message),
          do: for from <- :queue.to_list(q),
            do: cast(from, message)
    
        defp cast({ref, pid}, message),
          do: send(pid, {ref, message})
    
        defp done(state, action, q, continue \\ true) do
          run(action)
          broadcast(q, state)
          case continue do
            true ->
              :waiting
            false ->
              state
          end
        end
    
        defp run(nil),
          do: nil
        defp run(action),
          do: action.()
    
      end
    
    end
    

    Here's an example using CyclicBarrier in an IEx shell for Elixir:

    iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
    {CyclicBarrier, #PID<0.281.0>}
    iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
    done
    process 5: true
    process 1: true
    process 3: true
    process 2: true
    process 4: true
    [#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]
    

    The exact order of process execution is non-deterministic.

    Other examples of the functions on CyclicBarrier are below:

    iex> barrier = CyclicBarrier.start(2)
    {CyclicBarrier, #PID<0.280.0>}
    iex> barrier.alive?
    true
    iex> barrier.broken?
    false
    iex> barrier.number_waiting
    0
    iex> barrier.parties
    2
    iex> # let's spawn another process which will wait on the barrier
    iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
    #PID<0.288.0>
    iex> barrier.number_waiting
    1
    iex> # if we reset the barrier while another process is waiting
    iex> # on the barrier, it will break
    iex> barrier.reset
    barrier returned: false
    true
    iex> barrier.broken?
    true
    iex> # however, the barrier can be reset again to its initial state
    iex> barrier.reset
    true
    iex> barrier.broken?
    false
    iex> # if a timeout is exceeded while waiting for a barrier, it
    iex> # will also break the barrier
    iex> barrier.wait(100)
    false
    iex> barrier.broken?
    true
    iex> # let's reset the barrier, spawn another process to wait,
    iex> # and wait with a timeout in the current process
    iex> barrier.reset
    true
    iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
    #PID<0.289.0>
    iex> barrier.wait(100)
    barrier returned: true
    true
    iex> # if stop is called on the barrier, the barrier process will
    iex> # exit and all future calls to the barrier will return false
    iex> barrier.stop
    true
    iex> barrier.alive?
    false
    iex> barrier.reset
    false
    iex> barrier.wait
    false