Search code examples
processerlangmonitor

Erlang monitor multiple processes


I need to monitor a bunch of worker processes. Currently I'm able to monitor 1 process through 1 monitor. How do i scale this to monitoring N worker processes. Do i need to spawn N monitors as well? If so then what happens if one of those spawned monitors failed/crashed?


Solution

  • Do i need to spawn N monitors as well?

    No:

    -module(mo).
    -compile(export_all).
    
    worker(Id) ->
        timer:sleep(1000 * rand:uniform(5)),
        io:format("Worker~w: I'm still alive~n", [Id]),
        worker(Id).
    
    create_workers(N) ->
        Workers = [  % { {Pid, Ref}, Id }
            { spawn_monitor(?MODULE, worker, [Id]), Id }
            || Id <- lists:seq(1, N)
        ],
        monitor_workers(Workers).
    
    monitor_workers(Workers) ->
        receive
            {'DOWN', Ref, process, Pid, Why} ->
                Worker = {Pid, Ref},
                case is_my_worker(Worker, Workers) of
                    true  ->  
                        NewWorkers = replace_worker(Worker, Workers, Why),
                        io:format("Old Workers:~n~p~n", [Workers]),
                        io:format("New Workers:~n~p~n", [NewWorkers]),
                        monitor_workers(NewWorkers);
                    false -> 
                        monitor_workers(Workers)
                end;
            _Other -> 
                monitor_workers(Workers)
        end.
        
    is_my_worker(Worker, Workers) ->
        lists:keymember(Worker, 1, Workers).
    
    replace_worker(Worker, Workers, Why) ->
        {{Pid, _}, Id} = lists:keyfind(Worker, 1, Workers),
        io:format("Worker~w (~w) went down: ~s~n", [Id, Pid, Why]),
        NewWorkers = lists:keydelete(Worker, 1, Workers),
        NewWorker = spawn_monitor(?MODULE, worker, [Id]),
        [{NewWorker, Id}|NewWorkers].
    
    start() ->
        observer:start(),  %%In the Processes tab, you can right click on a worker and kill it.
        create_workers(4).
    

    In the shell:

    $ ./run
    Erlang/OTP 19 [erts-8.2] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
    
    Eshell V8.2  (abort with ^G)
    
    
    1> Worker3: I'm still alive
    Worker1: I'm still alive
    Worker2: I'm still alive
    Worker4: I'm still alive
    Worker3: I'm still alive
    Worker1: I'm still alive
    Worker4: I'm still alive
    Worker2: I'm still alive
    Worker3: I'm still alive
    Worker1: I'm still alive
    Worker4: I'm still alive
    Worker3 (<0.87.0>) went down: killed
    Old Workers:
    [{{<0.85.0>,#Ref<0.0.4.292>},1},
     {{<0.86.0>,#Ref<0.0.4.293>},2},
     {{<0.87.0>,#Ref<0.0.4.294>},3},
     {{<0.88.0>,#Ref<0.0.4.295>},4}]
    New Workers:
    [{{<0.2386.0>,#Ref<0.0.1.416>},3},
     {{<0.85.0>,#Ref<0.0.4.292>},1},
     {{<0.86.0>,#Ref<0.0.4.293>},2},
     {{<0.88.0>,#Ref<0.0.4.295>},4}]
    Worker2: I'm still alive
    Worker1: I'm still alive
    Worker2: I'm still alive
    Worker1: I'm still alive
    Worker1: I'm still alive
    Worker4: I'm still alive
    Worker3: I'm still alive
    Worker2: I'm still alive
    Worker1: I'm still alive
    Worker3: I'm still alive
    Worker4: I'm still alive
    Worker1: I'm still alive
    Worker4 (<0.88.0>) went down: killed
    Old Workers:
    [{{<0.2386.0>,#Ref<0.0.1.416>},3},
     {{<0.85.0>,#Ref<0.0.4.292>},1},
     {{<0.86.0>,#Ref<0.0.4.293>},2},
     {{<0.88.0>,#Ref<0.0.4.295>},4}]
    New Workers:
    [{{<0.5322.0>,#Ref<0.0.1.9248>},4},
     {{<0.2386.0>,#Ref<0.0.1.416>},3},
     {{<0.85.0>,#Ref<0.0.4.292>},1},
     {{<0.86.0>,#Ref<0.0.4.293>},2}]
    Worker3: I'm still alive
    Worker2: I'm still alive
    Worker4: I'm still alive
    Worker1: I'm still alive
    Worker3: I'm still alive
    Worker3: I'm still alive
    Worker2: I'm still alive
    Worker1 (<0.85.0>) went down: killed
    Old Workers:
    [{{<0.5322.0>,#Ref<0.0.1.9248>},4},
     {{<0.2386.0>,#Ref<0.0.1.416>},3},
     {{<0.85.0>,#Ref<0.0.4.292>},1},
     {{<0.86.0>,#Ref<0.0.4.293>},2}]
    New Workers:
    [{{<0.5710.0>,#Ref<0.0.1.10430>},1},
     {{<0.5322.0>,#Ref<0.0.1.9248>},4},
     {{<0.2386.0>,#Ref<0.0.1.416>},3},
     {{<0.86.0>,#Ref<0.0.4.293>},2}]
    Worker2: I'm still alive
    Worker3: I'm still alive
    Worker4: I'm still alive
    Worker3: I'm still alive
    

    I think the version below is probably more efficient: it uses lists:map() to both search for and replace the crashed worker, so it only traverses the Worker's list once:

    -module(mo).
    -compile(export_all).
    
    worker(Id) ->
        timer:sleep(1000 * rand:uniform(5)),
        io:format("Worker~w: I'm still alive~n", [Id]),
        worker(Id).
    
    create_workers(N) ->
        Workers = [  % { {Pid, Ref}, Id }
            { spawn_monitor(?MODULE, worker, [Id]), Id }
            || Id <- lists:seq(1,N)
        ],
        monitor_workers(Workers).
    
    monitor_workers(Workers) ->
        receive
            {'DOWN', Ref, process, Pid, Why} ->
                CrashedWorker = {Pid, Ref},
                NewWorkers = replace(CrashedWorker, Workers, Why),
                io:format("Old Workers:~n~p~n", [Workers]),
                io:format("New Workers:~n~p~n", [NewWorkers]),
                monitor_workers(NewWorkers);
            _Other -> 
                monitor_workers(Workers)
        end.
    
    replace(CrashedWorker, Workers, Why) ->
        lists:map(fun(PidRefId) ->
                          { {Pid,_Ref}=Worker, Id} = PidRefId,
                          case Worker =:= CrashedWorker of
                              true ->  %replace worker
                                  io:format("Worker~w (~w) went down: ~s~n", 
                                            [Id, Pid, Why]),
                                  {spawn_monitor(?MODULE, worker, [Id]), Id}; %=> { {Pid,Ref}, Id }
                              false ->  %leave worker alone
                                  PidRefId  
                          end
                  end,
                  Workers).
    
    start() ->
        observer:start(),  %%In the Processes tab, you can right click on a worker and kill it.
        create_workers(4).
    

    If so then what happens if one of those spawned monitors failed/crashed?

    Erlang owns several server farms in different countries, and erlang has acquired several redundant power grids, so erlang will restart everything in a fault tolerant, distributed system that will never fail. It's all built in. You don't have to worry about anything. :)

    Actually...anywhere that you can imagine something failing, then it has to be backed up, e.g. by another monitoring process on another computer.