Search code examples
erlanggen-server

How to use efficiently receive clause in erlang gen_server to resolve timeout error?


Sometimes my loop returns ok because of timeout how to write this code in proper way. When there is a timeout it just returns ok but not my actual value that I am assuming. In handle call I am calling a function loop() in the loop() function i am receiving a message with receive clause. Now I am sending this data to my database using loop2 function returns response from database whether data has been successfully saved or not and giving response back to loop(). But if there is a timeout my loop function returns ok but not actual value.

% @Author: ZEESHAN AHMAD
% @Date:   2020-12-22 05:06:12
% @Last Modified by:   ZEESHAN AHMAD
% @Last Modified time: 2021-01-10 04:42:59


-module(getAccDataCons).

-behaviour(gen_server).

-include_lib("deps/amqp_client/include/amqp_client.hrl").

-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
         terminate/2]).
-export([get_account/0]).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

stop() ->
    gen_server:cast(?MODULE, stop).

get_account() ->
    gen_server:call(?MODULE, {get_account}).

init(_Args) ->
    {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),
    {ok, Channel}.

handle_call({get_account}, _From, State) ->
    amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
    amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
    Binding =
        #'queue.bind'{exchange = <<"get">>,
                      routing_key = <<"get.account">>,
                      queue = <<"get_account">>},
    #'queue.bind_ok'{} = amqp_channel:call(State, Binding),
    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
    amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
    Returned =loop(),
    io:format("~nReti=~p",[Returned]),
    {reply, Returned, State};
    

handle_call(Message, _From, State) ->
    io:format("received other handle_call message: ~p~n", [Message]),
    {reply, ok, State}.

handle_cast(stop, State) ->
    {stop, normal, State};
handle_cast(Message, State) ->
    io:format("received other handle_cast call : ~p~n", [Message]),
    {noreply, State}.

handle_info(Message, State) ->
    io:format("received handle_info message : ~p~n", [Message]),
    {noreply, State}.

code_change(_OldVer, State, _Extra) ->
    {ok, State}.

terminate(Reason, _State) ->
    io:format("server is terminating with reason :~p~n", [Reason]).


    loop()->
        receive
         #'basic.consume_ok'{} -> ok
        end,
       receive
            {#'basic.deliver'{}, Msg} ->
                #amqp_msg{payload = Payload} = Msg,
                Value=loop2(Payload),
        Value
    after 2000->
    io:format("Server timeout")
    end.


  loop2(Payload)->
            Result = jiffy:decode(Payload),
            {[{<<"account_id">>, AccountId}]} = Result,
            Doc = {[{<<"account_id">>, AccountId}]},
            getAccDataDb:create_AccountId_view(),
            Returned=case getAccDataDb:getAccountNameDetails(Doc) of
                success ->
                    Respo = getAccDataDb:getAccountNameDetails1(Doc),
                     Respo;
                details_not_matched ->
                    user_not_exist
            end,
            Returned.

Solution

  • This is too long for an edit, I put it in a new answer.

    The reason why you receive ok when a timeout occurs is in the loop() code. In the second receive block, after 2000 ms, you return immediately after the io:format/1 statement.

    io:format returns ok and it is what you get in the Returned variable. You should change this code with

    loop()->
        ok = receive
            #'basic.consume_ok'{} -> ok
        end,
        receive
            {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> {ok,loop2(Payload)}
        after 2000 ->
            io:format("Server timeout"),
            {error,timeout}
        end.
    

    With this code your client will receive either {ok,Value}, either {error,timeout} and will be able to react accordingly.

    But there are still issues with this version: - the 2 seconds timeout is maybe too short and you are missing valid answer - as you are using pattern matching in the receive blocks and do not check the result of each amqp_channel:call there are many different problems that could occur and appear as a timeout

    First lets have a look at the timeout. It is possible that the 4 calls to amqp_channel really need more than 2 seconds in total to complete successfully. The simple solution is to increase your timeout, changing after 2000 to after 3000 or more. But then you will have 2 issues:

    • Your gen_server is blocked during all this time, and if it is not dedicated to a single client, it will be unavailable to serve any other request while it is waiting for the answer.
    • If you need to increase the timeout above 5 second, you will hit another timeout, managed internally by the gen_server: a request must be answered in less than 5 seconds.

    The gen_server offers some interface functions to solve this kind of problem: 'send_request', 'wait_response' and reply. Here is a basic gen_server which can handle 3 kind of requests:

    • stop ... to stop the server, useful to update the code.
    • {blocking,Time,Value} the server will sleep during Time ms end then return Value. This simulates your case, and you can tweak how long it takes to get an answer.
    • {non_blocking,Time,Value} the server will delegate the job to another process and return immediately without answer (therefore it is available for another request). the new process will sleep during Time ms end then return Value using gen_server:reply.

    The server module implements several user interfaces:

    • the standard start(), stop()
    • blocking(Time,Value) to call the server with the request {blocking,Time,Value} using gen_server:call
    • blocking_catch(Time,Value) same as the previous one, but catching the result of gen_server:call to show the hidden timeout
    • non_blocking(Time,Value,Wait) to call the server with the request {non_blocking,Time,Value} using gen_server:send_request and waiting for the answer for Wait ms maximum

    Finally it includes 2 test functions

    • test([Type,Time,Value,OptionalWait]) it spawns a process which will send a reqest of type with the corresponding parameters. The answer is sent back to the calling process. The answer can be retreive with flush() in the shell.
    • parallel_test ([Type,Time,NbRequests,OptionalWait]) it calls NbRequests times test with the corresponding parameters. It collects all the answers and print them using the local function collect(NbRequests,Timeout).

    Code below

    -module (server_test).
    
    -behaviour(gen_server).
    
    %% API
    -export([start/0,stop/0,blocking/2,blocking_catch/2,non_blocking/3,test/1,parallel_test/1]).
    
    
    %% gen_server callbacks
    -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
             terminate/2, code_change/3]).
    
    -define(SERVER, ?MODULE). 
    
    %%%===================================================================
    %%% API
    %%%===================================================================
    start() ->
        gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
    
    stop() ->
        gen_server:cast(?SERVER, stop).
    
    blocking(Time,Value) ->
        gen_server:call(?SERVER, {blocking,Time,Value}).
    
    blocking_catch(Time,Value) ->
        catch {ok,gen_server:call(?SERVER, {blocking,Time,Value})}.
    
    non_blocking(Time,Value,Wait) ->
        ReqId = gen_server:send_request(?SERVER,{non_blocking,Time,Value}),
        gen_server:wait_response(ReqId,Wait).
    
    test([Type,Time,Value]) -> test([Type,Time,Value,5000]);
    test([Type,Time,Value,Wait]) ->
        Start = erlang:monotonic_time(),
        From = self(),
        F = fun() -> 
            R = case Type of 
                non_blocking -> ?MODULE:Type(Time,Value,Wait);
                _ -> ?MODULE:Type(Time,Value)
            end,
            From ! {request,Type,Time,Value,got_answer,R,after_microsec,erlang:monotonic_time() - Start} 
        end,
        spawn(F).
    
    parallel_test([Type,Time,NbRequests]) -> parallel_test([Type,Time,NbRequests,5000]);
    parallel_test([Type,Time,NbRequests,Wait]) ->
        case Type of
            non_blocking -> [server_test:test([Type,Time,X,Wait]) || X <- lists:seq(1,NbRequests)];
            _ -> [server_test:test([Type,Time,X]) || X <- lists:seq(1,NbRequests)]
        end,
        collect_answers(NbRequests,Time + 1000).
    
    
    %%%===================================================================
    %%% gen_server callbacks
    %%%===================================================================
    init([]) ->
        {ok, #{}}.
    
    handle_call({blocking,Time,Value}, _From, State) ->
        timer:sleep(Time),
        Reply = {ok,Value},
        {reply, Reply, State};
    handle_call({non_blocking,Time,Value}, From, State) ->
        F = fun() ->
            do_answer(From,Time,Value)
        end,
        spawn(F),
        {noreply, State};
    handle_call(_Request, _From, State) ->
        Reply = ok,
        {reply, Reply, State}.
    
    handle_cast(stop, State) ->
        {stop,stopped, State};
    handle_cast(_Msg, State) ->
        {noreply, State}.
    
    handle_info(_Info, State) ->
        {noreply, State}.
    
    terminate(_Reason, _State) ->
        ok.
    
    code_change(OldVsn, State, _Extra) ->
        io:format("changing code replacing version ~p~n",[OldVsn]),
        {ok, State}.
    
    %%%===================================================================
    %%% Internal functions
    %%%===================================================================
    
    do_answer(From,Time,Value) ->
        timer:sleep(Time),
        gen_server:reply(From, Value).
    
    collect_answers(0,_Timeout) ->
        got_all_answers;
    collect_answers(NbRequests,Timeout) ->
        receive 
            A -> io:format("~p~n",[A]),
                collect_answers(NbRequests - 1, Timeout)
        after Timeout ->
            missing_answers
        end.
    

    Session in the shell:

    44> c(server_test).                                    
    {ok,server_test}
    45> server_test:start().                               
    {ok,<0.338.0>}
    46> server_test:parallel_test([blocking,200,3]).
    {request,blocking,200,1,got_answer,{ok,1},after_microsec,207872}
    {request,blocking,200,2,got_answer,{ok,2},after_microsec,415743}
    {request,blocking,200,3,got_answer,{ok,3},after_microsec,623615}
    got_all_answers
    47> % 3 blocking requests in parallel, each lasting 200ms, they are executed in sequence but no timemout is reached
    47> % All the clients get their answers
    47> server_test:parallel_test([blocking,2000,3]).                                                                                                       
    {request,blocking,2000,1,got_answer,{ok,1},after_microsec,2063358}
    {request,blocking,2000,2,got_answer,{ok,2},after_microsec,4127740}
    missing_answers
    48> % 3 blocking requests in parallel, each lasting 2000ms, they are executed in sequence and the last answer exceeds the gen_server timeout.       
    48> % The client for this request don't receive answer. The client should also manage its own timeout to handle this case
    48> server_test:parallel_test([blocking_catch,2000,3]).                                                                                             
    {request,blocking_catch,2000,1,got_answer,{ok,1},after_microsec,2063358}
    {request,blocking_catch,2000,2,got_answer,{ok,2},after_microsec,4127740}
    {request,blocking_catch,2000,3,got_answer,
             {'EXIT',{timeout,{gen_server,call,[server_test,{blocking,2000,3}]}}},
             after_microsec,5135355}
    got_all_answers
    49> % same thing but catching the exception. After 5 seconds the gen_server call throws a timeout exception.
    49> % The information can be forwarded to the client
    49> server_test:parallel_test([non_blocking,200,3]).                                                       
    {request,non_blocking,200,1,got_answer,{reply,1},after_microsec,207872}
    {request,non_blocking,200,2,got_answer,{reply,2},after_microsec,207872}
    {request,non_blocking,200,3,got_answer,{reply,3},after_microsec,207872}
    got_all_answers
    50> % using non blocking mechanism, we can see that all the requests were managed in parallel 
    50> server_test:parallel_test([non_blocking,5100,3]).                                        
    {request,non_blocking,5100,1,got_answer,timeout,after_microsec,5136379}
    {request,non_blocking,5100,2,got_answer,timeout,after_microsec,5136379}
    {request,non_blocking,5100,3,got_answer,timeout,after_microsec,5136379}
    got_all_answers
    51> % if we increase the answer delay above 5000ms, all requests fail in default timeout
    51> server_test:parallel_test([non_blocking,5100,3,6000]).                              
    {request,non_blocking,5100,1,got_answer,{reply,1},after_microsec,5231611}
    {request,non_blocking,5100,2,got_answer,{reply,2},after_microsec,5231611}
    {request,non_blocking,5100,3,got_answer,{reply,3},after_microsec,5231611}
    got_all_answers
    52> % but thanks to the send_request/wait_response/reply interfaces, the client can adjust the timeout to an accurate value
    52> % for each request
    

    The next reason why the request could not complete is that one of the amqp_channel:call fails. Depending on what you want to do, there are several possibilities from doing nothing, let crash, catch the exception or manage all cases. the next proposal uses a global catch

    handle_call({get_account,Timeout}, From, State) ->
        F = fun() ->
            do_get_account(From,State,Timeout)
        end,
        spawn(F), % delegate the job to another process and free the server
        {noreply, State}; % I don't see any change of State in your code, this should be enough
    
    ...
    
    do_get_account(From,State,Timeout) ->
        % this block of code asserts all positive return values from amqp_channel calls. it will catch any error
        % and return it as {error,...}. If everything goes well it return {ok,Answer}
        Reply = try
            ok = amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
            ok = amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
            Binding = #'queue.bind'{exchange = <<"get">>,
                                    routing_key = <<"get.account">>,
                                    queue = <<"get_account">>},
            #'queue.bind_ok'{} = amqp_channel:call(State, Binding),
            ok = amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
            {ok,wait_account_reply(Timeout)}
        catch
            Class:Exception -> {error,Class,Exception}
        end,
        gen_server:reply(From, Reply).
    
    wait_account_reply(Timeout) ->
        receive
        % #'basic.consume_ok'{} -> ok % you do not handle this message, ignore it since it will be garbaged when the process die
            {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> extract_account(Payload)
        after Timeout->
           server_timeout
        end.
    
    
    extract_account(Payload)->
            {[{<<"account_id">>, AccountId}]} = jiffy:decode(Payload),
            Doc = {[{<<"account_id">>, AccountId}]},
            getAccDataDb:create_AccountId_view(), % What is the effect of this function, what is the return value?
            case getAccDataDb:getAccountNameDetails(Doc) of
                success ->
                    getAccDataDb:getAccountNameDetails1(Doc);
                details_not_matched ->
                    user_not_exist
            end.
    

    And the client should looks like:

    get_account() ->
        ReqId = gen_server:send_request(server_name,{get_account,2000}),
        gen_server:wait_response(ReqId,2200).