Search code examples
elixirgen-server

How do I make a GenServer that processes messages at a specific rate? (every n seconds)


One of my services talks to an external API that is rate-limited, so I want to ensure that I send no more than 1 call every 10 seconds.

My naive approach would be to have a long-running API service, and time it out after each call:

def handle_cast({:call_api, data}, state) do
  send_to_external_api(data)
  :timer.sleep(10000)
  {:noreply, state}
end

I'm not sure if there's a proper way to do this.


Solution

  • Edit: The original solution dropped messages in between 10s ticks, as burmajam suggested. The edit provides a more appropriate solution.


    EDIT

    Due to the fact that GenServer handle_* functions do not actually receive messages from the queue, but just process them, we cannot exploit pattern matching to receive selectively only each 10s from the processes queue.

    Therefore, since we're picking up messages in order of their arrival, we need out internal queue as part of GenServer's state.

    defmodule RateLimited do
      use GenServer
    
      def start_link do
        GenServer.start_link(__MODULE__, %{queue: []})
      end
    
      def init(state) do
        allow_work()
        {:ok, state}
      end
    
      def handle_cast({:call_api, data}, %{"queue" => queue} = state) do
        {:noreply, %{state | queue: queue ++ [data]}}
      end
    
      def handle_info(:work, %{"queue" => [data | queue]} = state) do
          send_to_external_api(data)
        allow_work()
    
        {:noreply, %{state | queue: queue}}
      end
    
      defp allow_work() do
        Process.send_after(self(), :work, 10000) # after 10s
      end
    
      defp send_to_external_api (data) do end
    end
    

    So we're just moving messages from process queue to state queue and we process the head when we signal ourselves that 10s have passed.

    But in the end, we actually achieve the same result as if we put the process to sleep for 10 seconds. Your solution seems easier and achieves the same result.


    The solution is based on How to run some code every few hours in Phoenix framework?

    First, let your GenServer store a flag in its state (work = true/false).

    Then let GenServer use Process.send_after to signal itself when it can work. You receive the signal in handle_info where you set the work state flag to true.

    Now notice pattern matching on state in the handle_cast function: it will only pick up the message when work state flag equals true. Otherwise, messages will be put in the queue waiting.

    And after you send the message to external service, you run Process.send_after again to schedule the next signal and return state which has work flag set to false to prevent next messages from being immediately picked up.

    defmodule RateLimited do
      use GenServer
    
      def start_link do
        GenServer.start_link(__MODULE__, %{work: false})
      end
    
      def init(state) do
        allow_work()
        {:ok, state}
      end
    
      def handle_cast({:call_api, data}, %{"work" => true} = state) do
        send_to_external_api(data)
        allow_work()
        {:noreply, %{state | work = false}}
      end
    
      def handle_info(:work, state) do
        {:noreply, %{state | work = true}}
      end
    
      defp allow_work() do
        Process.send_after(self(), :work, 10000) # after 10s
      end
    end