I am getting a stream of events from MQ into my Elixir consumer.
In the consumer I need to:
Data set is not big in my case. May be few hundreds of IDs and few thousands updates a day.
Is there any way I can solve this problem using GenServer magic?
Thank you!
I'd do it like this:
Whenever a new event comes:
If it's the first event with that id, create a timer ref using Process.send_after/3
with a timeout of 3 minutes, and store the event and the timer in the state.
If it's not the first event with that id, cancel the stored timer ref with Process.cancel_timer/1
, create a new timer as mentioned in the previous step, and store the new timer along with the new event concatenated with the old events.
And in the handle_info
triggered by the timer, push the events for that id downstream and remove that entry from the state.
Here's a simple implementation of the above:
defmodule DebouncedEcho do
@timeout 1000
use GenServer
def start_link do
GenServer.start_link __MODULE__, []
end
def init(_) do
{:ok, %{}}
end
def handle_cast({:store, id, event}, state) do
case state[id] do
nil ->
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event], timer: timer})
{:noreply, state}
%{events: events, timer: timer} ->
Process.cancel_timer(timer)
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event | events], timer: timer})
{:noreply, state}
end
end
def handle_info({:timer, id}, state) do
%{events: events} = state[id]
IO.inspect {:flush, id, events}
state = Map.delete(state, id)
{:noreply, state}
end
end
Test:
{:ok, server} = DebouncedEcho.start_link
GenServer.cast server, {:store, 1, :foo}
GenServer.cast server, {:store, 1, :bar}
GenServer.cast server, {:store, 2, :foo}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :bar}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :baz}
:timer.sleep(500)
GenServer.cast server, {:store, 1, :baz}
:timer.sleep(2000)
Output:
{:flush, 1, [:bar, :foo]}
{:flush, 2, [:baz, :bar, :foo]}
{:flush, 1, [:baz]}