Search code examples
erlangtimeouttrace

Understanding why process stops receiving messages


I have a situation where i have 3 processes:

  • one process acting as a dispatcher for messages: server
  • one process acting as a supervisor (for a worker): monitor
  • one process acting as a worker that notifies supervisor when done: worker

The server sends a request to monitor and the monitor first checks if worker is busy.If busy, monitor enqueues the message (if capacity is not reached) or else forwards it to worker. When the worker finishes processing it notifies both the client , and also the monitor

My problem is that my worker process stops responding after processing the first message

-module(mq).
-compile(export_all).

-record(monstate,{
    queue,
    qc,
    wpid,
    free=true,
    wref,
    init=false,
    frun=false
}).
-record(sstate,{
    init=false,
    mpid=null,
    mref=null
}).

-define(QUEUE_SIZE,5).
-define(PROC_SLEEP,2000).


createProcess({M,F,A})->
    Pid=spawn(M,F,[A]),
    Ref=erlang:monitor(process,Pid),
    {Pid,Ref}.

start()->
    spawn(?MODULE,server,[#sstate{init=false}]).

server(State=#sstate{init=I})when I=:=false ->
    {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
    server(State#sstate{init=true,mpid=MPid,mref=MRef});

server(State=#sstate{mpid=MPid,mref=MRef})->
    receive
           {From,state}->From ! State,
                            server(State);
           {From,Message}-> MPid ! {request,{From,Message}},
                            server(State);
                
            {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                            server(State#sstate{mpid=NewMPid,mref=NewMRef});
            _ ->exit(invalid_message)
                                    
    end.
  

tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
    NewQueue=queue:in(Message,Q),
    {queued,MState#monstate{qc=C+1,queue=NewQueue}};
tryEnqueue(_,MState)->{queue_full,MState}.

monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
    {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
    monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new(),frun=true});

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=R})->
    receive
        
        {request,{From ,Message}} ->  
                                       {Result,NewState}=tryEnqueue({From,Message},MState),
                                        case Result of 
                                            queue_full -> From ! {queue_full,Message};
                                            _ -> ok
                                        end,
                                        case R of
                                            true -> self() ! {worker,{finished,R}},
                                                    monitor(NewState#monstate{frun=false});
                                            false -> monitor(NewState#monstate{frun=false})
                                        end;
                                       

        {worker,{finished,_}}-> case queue:out(Q) of
                                    {{_,Element},Rest} -> W ! Element,
                                                    monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                    {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                end;

        {'DOWN',Ref,process,_,_}->
             {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
             monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});

        _->exit(invalid_message)

    end.

worker(MPid)->
    receive 
        {From,MSG} ->
            timer:sleep(?PROC_SLEEP),
            From ! {processed,MSG},
            MPid ! {worker,{finished,MSG}},
            worker(MPid);
        _ ->exit(bad_msg)
    end.

Usage

2> A=mq:start().
<0.83.0>
3> A ! {self(),aa}.
{<0.76.0>,aa}
4> flush().
Shell got {processed,aa}
ok
5> A ! {self(),aa}.
{<0.76.0>,aa}
6> flush().
ok

I have added a tracer to see what is happening:

10> dbg:tracer().
{ok,<0.96.0>}
11> dbg:p(new,[sos,m]).
{ok,[{matched,nonode@nohost,0}]}

First run:

14> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}     // message received my server 
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //message forwarded by server to monitor
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}         
15> (<0.101.0>) <0.101.0> ! {worker,{finished,true}} //monitor starting the cycle
15> (<0.101.0>) << {worker,{finished,true}}  
15> (<0.101.0>) <0.102.0> ! {<0.76.0>,aa}  // monitor sending message to worker
15> (<0.102.0>) << {<0.76.0>,aa}
15> (<0.105.0>) <0.62.0> ! {io_request,<0.105.0>,
                           #Ref<0.3226054513.2760638467.167990>,
                           {get_until,unicode,
                               ["15",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}
15> (<0.102.0>) << timeout                      //worker getting timeout ??
15> (<0.102.0>) <0.76.0> ! {processed,aa}    //worker sends to self() thje message
15> (<0.102.0>) <0.101.0> ! {worker,{finished,aa}}   //worker notifies monitor to update state
15> (<0.101.0>) << {worker,{finished,aa}}

Second run:

15> A ! {self(),aa}.
(<0.100.0>) << {<0.76.0>,aa}
(<0.100.0>) <0.101.0> ! {request,{<0.76.0>,aa}}   //monitor receiveing message
{<0.76.0>,aa}
(<0.101.0>) << {request,{<0.76.0>,aa}}
16> (<0.106.0>) <0.62.0> ! {io_request,<0.106.0>,
                           #Ref<0.3226054513.2760638467.168007>,
                           {get_until,unicode,
                               ["16",62,32],
                               erl_scan,tokens,
                               [1,[text]]}}

As you can see from my trace , in the first call i do not understand what happens.Does my worker get timeouted and if so why ?

P.S The frun variable is used as a flag that is true only at the first monitor iteration so that when the first item arrives the process will call itself to process it (send it to the worker) since the worker is free of duty. After the first run the monitor will dequeue items from the queue whenever the worker signals he is free.

Update

So after the helpful comments i have changed my logic a bit in the monitor so that the worker gets a message on the first run , or , after he is done and notifies the monitor , there are still items in the queue of the monitor. I still can't make it work.Where is the deadlock ?

monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C,frun=FirstRun})->
        receive
            {request,{From ,Message}} -> case FirstRun of
                                            true ->  W ! {From,Message},
                                                     monitor(MState#monstate{frun=false,free=false});                                                     
                                            false -> 
                                                     St=case tryEnqueue({From,Message},MState) of 
                                                           {queue_full,S} -> From ! {queue_full,Message},
                                                                             S;
                                                           {queued,S} -> S
                                                        end,
                                                     monitor(St)
                                             end;
                                                                        
            {worker,{finished,_}}-> case queue:out(Q) of
                                        {{_,Element},Rest} -> W ! Element,
                                                        monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
                                        {empty,Rest} -> monitor(MState#monstate{free=true,queue=Rest})
                                    end;

        end.

Solution

  • The monitor behavior need to depend on frun. It just needs to depend on whether worker is free. I have updated monitor function to reflect this in the following code.

    -module(mq).
    -compile(export_all).
    
    -record(monstate,{
        queue,
        qc,
        wpid,
        free=true,
        wref,
        init=false
    }).
    -record(sstate,{
        init=false,
        mpid=null,
        mref=null
    }).
    
    -define(QUEUE_SIZE,5).
    -define(PROC_SLEEP,2000).
    
    
    createProcess({M,F,A})->
        Pid=spawn(M,F,[A]),
        Ref=erlang:monitor(process,Pid),
        {Pid,Ref}.
    
    start()->
        spawn(?MODULE,server,[#sstate{init=false}]).
    
    server(State=#sstate{init=I})when I=:=false ->
        {MPid,MRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
        server(State#sstate{init=true,mpid=MPid,mref=MRef});
    
    server(State=#sstate{mpid=MPid,mref=MRef})->
        receive
               {From,state}->From ! State,
                                server(State);
               {From,Message}-> MPid ! {request,{From,Message}},
                                server(State);
    
                {'DOWN',MRef,process,MPid,_}-> {NewMPid,NewMRef}=createProcess({?MODULE,monitor,#monstate{init=false}}),
                                                server(State#sstate{mpid=NewMPid,mref=NewMRef});
                _ ->exit(invalid_message)
    
        end.
    
    
    tryEnqueue(Message,MState=#monstate{queue=Q,qc=C}) when C<?QUEUE_SIZE->
        NewQueue=queue:in(Message,Q),
        {queued,MState#monstate{qc=C+1,queue=NewQueue}};
    tryEnqueue(_,MState)->{queue_full,MState}.
    
    monitor(MState=#monstate{wpid=_,wref=_,init=I}) when I=:= false ->
        {WorkerPid,WorkerRef}=createProcess({?MODULE,worker,self()}),
        monitor(MState#monstate{wpid=WorkerPid,wref=WorkerRef,init=true,qc=0,queue=queue:new()});
    
    monitor(MState=#monstate{wpid=W,free=F,wref=Ref,queue=Q,qc=C})->
      receive
        {request,{From ,Message}} ->
          %% check whether worker is free or not
          case F of
            true ->
              W ! {From,Message},
              monitor(MState#monstate{free=false});
    
            false ->
              St=case tryEnqueue({From,Message},MState) of
                   {queue_full,S} ->
                     From ! {queue_full,Message},
                     S;
                   {queued,S} -> S
                 end,
              monitor(St)
          end;
    
        {worker,{finished,_}} ->
          case queue:out(Q) of
            {{_,Element},Rest} ->
              W ! Element,
              monitor(MState#monstate{free=false,queue=Rest,qc=C-1});
    
            {empty,Rest} ->
              monitor(MState#monstate{free=true,queue=Rest})
          end;
    
        {'DOWN',Ref,process,_,_} ->
          {NewWorkerPid,NewWorkerRef}=createProcess({?MODULE,worker,self()}),
          monitor(MState#monstate{wpid=NewWorkerPid,wref=NewWorkerRef,free=true});
    
        _->exit(invalid_message)
    
      end.
    
    worker(MPid)->
      receive
        {From,MSG} ->
          timer:sleep(?PROC_SLEEP),
          From ! {processed,MSG},
          MPid ! {worker,{finished,MSG}},
          worker(MPid);
        _ ->exit(bad_msg)
      end.
    
    

    Usage

    Eshell V10.5  (abort with ^G)
    1> c(mq).
    mq.erl:2: Warning: export_all flag enabled - all functions will be exported
    {ok,mq}
    2> A=mq:start().
    <0.92.0>
    3> A ! {self(),aa}.
    {<0.85.0>,aa}
    4> flush().
    Shell got {processed,aa}
    ok
    5> A ! {self(),aa}.
    {<0.85.0>,aa}
    6> flush().
    Shell got {processed,aa}
    ok
    7> A ! {self(), aa}, A ! {self(), bb}.
    {<0.85.0>,bb}
    8> flush().                           
    Shell got {processed,aa}
    Shell got {processed,bb}
    ok
    9>