Search code examples
erlangerlang-shell

How to broadcast a message to a list of processes in Erlang? Console hanging


I am new to Erlang and I am trying to understand how to send a message from one process to a list of processes.

Supposedly we have a data structure that holds a list with elements containing a string and Pid. How can I make a Pid send a message "M" to Pids that are one of the two elements previously described ? What I have come up with is:

broadcast(P, M, R) ->
  P ! {self(), friends},
  receive
    {P, Friends} ->
  P ! {self(), {send_message, {M, R, P, Friends}}}
  end.

looper({Name, Friends, Messages}) ->
receive
  {From, friends} ->
    From ! {self(), Friends},
    looper({Name, Friends, Messages});
  {From, {send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}} ->
    if R =< 0 ->
          From ! {From, {self(), {ID, M}}},
          looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
       R > 0  andalso FriendTale =/= []->
         FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
         looper({Name, FriendTale, [{ID, M} | Messages]})
    end;
   terminate ->
    ok
end.

But from what I understand is that I don't pattern match correctly the list of Pids so that I can "extract" the Pid from an element of the list of Pids, or I don't correctly use the list to send a message to it.

Basically, I have a function called "looper" that is constantly waiting for new messages to arrive. When it receives a message of type

{send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}

where "M" is the message that I want to broadcast to the list of Pids called "Friends" and R is just an integer.

The R is basically an integer saying how far the message should go.

e.g. 0 = broadcast the message to self,
     1 = broadcast the message to the friends of the pid,
     2 = broadcast the message to the friends of the friends of the pid and so on...

What I get from the terminal after I setup the Pid, set the "friendships" between the Pids and broadcast a message is:

1> f().
ok
2> c(facein).
facein.erl:72: Warning: variable 'From' is unused
{ok,facein}
3> {Message, Pid} = facein:start({"dummy", [], []}).
{ok,<0.186.0>}
4> {Message, Pid2} = facein:start({"dummy2", [], []}).
{ok,<0.188.0>}
5> facein:add_friend(Pid,Pid2).
ok
6> facein:broadcast(Pid,"hello",1).    
=ERROR REPORT==== 5-Oct-2014::12:12:58 ===
Error in process <0.186.0> with exit value: {if_clause,[{facein,looper,1,[{file,"facein.erl"},{line,74}]}]}

{<0.177.0>,{send_message,{"hello",1,#Ref<0.0.0.914>}}}

When I view the messages of the Pid that broadcasted the message, then the console just hangs and the other Pids have no messages received.

Any help would be greatly appreciated. Thanks


Solution

  • error message

    This time what you get is if_clause error. In Erlang every expression have to return some value, if included. Meaning that you could write code like this

    SomeVar = if 
         R =< 0 ->
           [...]
         R > 0 andalso FriendTale =/= []->
           [...]
      end
    

    As you can see if needs to "return" something, and to do that, one of it's branches needs to run. Or in other words one of it's clauses needs to mach. But in your case, when R > 0 and FriendsTale =:= [] non of them does. Hence an run-time error.

    As general practive last of clauses is left as

      _ ->  
         [...]
    

    which will always match, and save you from such error.

    In you example you don't have to use if at all. What you could do is to extend you receive clauses with some guards

    looper({Name, Friends, Messages}) ->
      receive
        {From, {send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}} 
           when R =< 0 ->
              From ! {From, {self(), {ID, M}}},
              looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
        {From, {send_message, {M, R, ID, [{FriendPid, FriendName} | FriendTale]}}}  
           when R > 0  andalso FriendTale =/= [] ->
              FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
              looper({Name, FriendTale, [{ID, M} | Messages]});
        terminate ->
              ok
      end.
    

    In case of receive message that is received do not have to match to one clauses. And if it does not, it is just left in message box (ignored in this receive, but could be caught by another).

    Or fallowing you logic you could pattern match on R itself

    looper({Name, Friends, Messages}) ->
      receive
        {From, {send_message, {M, 0, ID, [{FriendPid, FriendName} | FriendTale]}}} ->
              From ! {From, {self(), {ID, M}}},
              looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
        {From, {send_message, {M, 1, ID, [{FriendPid, FriendName} | FriendTale]}}}  
           when FriendTale =/= [] ->
              FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
              looper({Name, FriendTale, [{ID, M} | Messages]});
        terminate ->
              ok
      end.
    

    And to increase readybility, you could change R from opque integer, to miningfull atoms

    looper({Name, Friends, Messages}) ->
      receive
        {From, {send_message, {M, back_to_me, ID, [{FriendPid, FriendName} | FriendTale]}}} ->
              From ! {From, {self(), {ID, M}}},
              looper({Name, [{FriendPid, FriendName} | FriendTale], [{ID, M} | Messages]});
    
        {From, {send_message, {M, to_friends, ID, [{FriendPid, FriendName} | FriendTale]}}}  
           when FriendTale =/= [] ->
              FriendPid ! {From, {send_message, {M, R-1, ID, FriendTale}}},
              looper({Name, FriendTale, [{ID, M} | Messages]});
    
        terminate ->
              ok
      end.
    

    broadcast to friends

    If I understand correctly looper is function representing one "person". Each friend is process that stores list of friends, can add and remove from it, and can send messages to another friends.

    Lets start from creating clause for each of those functions (creating process interface)

    looper(Name, Friends, Messages) ->
      receive 
         {add_friend, Friend} ->
            [...];
         {remove_friend, Friend} ->
            [...];
         {receive_message, Message} ->
            [...];frineds
         {broadcast_to_self, Message} ->
            [...];
         {broadcast_to_friends, Message} ->
            [...];
         terminate ->
            ok
      end
    

    Most of those are easily implemented, like

    {add_frined, Friend} ->
        looper(Name, [Friend, Friends], Messages);
    

    so I will not go into details.

    The ones that do the broadcast do not change state, so for now lets write something like this (mostly for sake of readability

         {broadcast_to_friends, Message} ->
            handle_broadcast_to_friends(Friends, Message),
            looper(Name, Friends, Messages);
    

    and implement new function below

    handle_broadcast_to_friends(Friends, Message) ->
       [ F ! {receive_message, Message} || F <- Friends ].
    

    Now, since knowing exactly what tuple with which atoms to send is not convenient, we could wrap our "message interface" into "funciton interface". For example

    add_friend(Person,  Friend) ->
       Person ! {add_friend, Friends}.
    
    receive_message(Person, Message) ->
        Person ! {receive_message, Message}.
    

    And we could use those also in your logic implementation

    handle_broadcast_to_friends(Friends, Message) ->
       [ receive_message(F, Message)  || F <- Friends ].
    

    That should start you on wright track. If you need MessageID or something like that, just extend your interface. If you really need to create broadcast_to_all, you need to think how would you handle messages circling around, which is a not simple problem.