Search code examples
erlangdistributed-computingrpcerlang-otp

Distributed Erlang: multicall exceeds requested timeout


We use distributed erlang cluster and now I tests it in case of net splits.

To get information from all nodes of the cluster I use gen_server:multicall/4 with defined timeout. What I need is to get information from available nodes as soon as possible. So timeout is not too big (about 3000 ms). Here call example:

Timeout = 3000
Nodes = AllConfiguredNodes
gen_server:multi_call(Nodes, broker, get_score, Timeout)

I expect that this call returns result in Timeout ms. But in case of net split it does not. It waits approx. 8 seconds.

What I found that multi_call request is halted for additional 5 seconds in call erlang:monitor(process, {Name, Node}) before sending request.

I really do not care that some node do not reply or busy or not available, I can use any other but with this halting I forced to wait until Erlang VM try to establish new connection to dead/not available node.

The question is: do you know solution that can prevent this halting? Or may be another RPC that suitable for my situation.


Solution

  • My solution of the problem.

    I've made my own implementation of multicall that uses gen_server:call Basic idea is to call all nodes with gen_server:call() in separate process. And collect result of these calls. Collection is made by receiving messages from mailbox of calling process.

    To control timeout I calculate deadline when timeout expired and then use it as reference point to calculate timeout for after in receive.

    Implementation

    Main function is:

    multicall(Nodes, Name, Req, Timeout) ->
        Refs = lists:map(fun(Node) -> call_node(Node, Name, Req, Timeout) end, Nodes),
        Results = read_all(Timeout, Refs),
        PosResults = [ { Node, Result } || { ok, { ok, { Node, Result } } } <- Results ],
        { PosResults, calc_bad_nodes(Nodes, PosResults) }.
    

    Idea here is to call all nodes and wait for all results within one Timeout.

    Calling one node is performed from spawned process. It catches exits that used by gen_server:call in case of error.

    call_node(Node, Name, Req, Timeout) ->
        Ref = make_ref(),
        Self = self(),
        spawn_link(fun() ->
                           try
                               Result = gen_server:call({Name,Node},Req,Timeout),
                               Self ! { Ref, { ok, { Node, Result } } }
                           catch
                               exit:Exit ->
                                   Self ! { Ref, { error, { 'EXIT', Exit } } }
                           end
                   end),
        Ref.
    

    Bad nodes are calculated as those that are not respond within Timout

    calc_bad_nodes(Nodes, PosResults) ->
        { GoodNodes, _ } = lists:unzip(PosResults),
        [ BadNode || BadNode <- Nodes, not lists:member(BadNode, GoodNodes) ].
    

    Results are collected by reading mailbox with Timeout

    read_all(ReadList, Timeout) ->
        Now = erlang:monotonic_time(millisecond),
        Deadline = Now + Timeout,
        read_all_impl(ReadList, Deadline, []).
    

    Implementation reads until Deadline does not occur

    read_all_impl([], _, Results) ->
        lists:reverse(Results);
    read_all_impl([ W | Rest ], expired, Results) ->
        R = read(0, W),
        read_all_impl(Rest, expired, [R | Results ]);
    read_all_impl([ W | Rest ] = L, Deadline, Results) ->
        Now = erlang:monotonic_time(millisecond),
        case Deadline - Now of
            Timeout when Timeout > 0 ->
                R = read(Timeout, W),
                case R of
                    { ok, _ } ->
                        read_all_impl(Rest, Deadline, [ R | Results ]);
                    { error, { read_timeout, _ } } ->
                        read_all_impl(Rest, expired, [ R | Results ])
                end;
            Timeout when Timeout =< 0 ->
                read_all_impl(L, expired, Results)
        end.
    

    One single read is just receive from mailbox with Timeout.

    read(Timeout, Ref) ->
        receive
            { Ref, Result } ->
                { ok, Result }
        after Timeout ->
                { error, { read_timeout, Timeout } }
        end.
    

    Further improvements:

    • rpc module spawns separate process to avoid garbage of late answers. So It will be useful to do the same in this multicall function
    • infinity timeout may be handled in obvious way