I this this simple genserver:
def handle_info(:tick, items) do
items2 = do_job(items)
tick()
{:noreply, items2}
end
In "do_job" I need to a) iterate throug items, b) make an http request which can take long and c) depending on a response, update a database and finish with the current item by removing it from "items" or merely update the database:
def do_job(items) do
Enum.each(items, fn(a) -> # or Enum.map
Task.start fn ->
case external_request(a) do
{:terminate, data} ->
Repo.update(....)
remove(a) # send a message to this GenServer to remove itself
{:continue, data} ->
Repo.update(....)
end
end
end)
end
1) Do I have to return a new value -- updated list/state -- from "do_job" to allow my GenServer work properly?
2) If so, how? I can't return an updated state from "do_job" because I create a Task for each item because it requires sending an http request and CRUD operation in a database. That's why an asyncronous task.
3) And in general, GenServer manages the state variable by itself, in this case it's "items". What allows GenServer to understand how to update it? Let's consider this code:
def add(a) do
GenServer.cast(__MODULE__, {:add, a}) # what/who utilizes this return value?
end
def remove(....) do
# ....
def handle_cast({:add, a}, items) do
{:noreply, [a | items]} # what/who utilizes this return value?
end
A client doesn't use a returned value from "add" or "remove", thus it gets thrown away. Nonetheless, when a client calls "add" 3 times, this GenServer will have 3 items in a list. But why? How does GenServer handle that?
1) Do I have to return a new value -- updated list/state -- from "do_job" to allow my GenServer work properly?
You don't have to return the new updated list from handle_info
, but if you return the same list, the next :tick
message can cause a new set of requests to be made to the same items even if their first run would eventually return {:terminate, _}
if you handle removing the jobs by sending a message to this GenServer from do_job
later.
2) If so, how? I can't return an updated state from "do_job" because I create a Task for each item because it requires sending an http request and CRUD operation in a database. That's why an asyncronous task.
You can return the new list while running the jobs in parallel. This can be done by spawning all the tasks first, and then waiting for all. I would definitely suggest this way, as it's simple and effective against duplicate jobs being run.
Here's an implementation of this method (untested):
def do_job(items) do
items
|> Enum.map(fn(a) ->
Task.async(fn ->
case external_request(a) do
{:terminate, data} ->
Repo.update(....)
[] # remove this item from list
{:continue, data} ->
Repo.update(....)
[a] # keep this item in list
end
end)
end)
|> Enum.flat_map(&Task.await/1)
end
do_job
will now take a list of items, pass all of them to external_request
in parallel, then wait until all of them have returned, and remove all the items from the list which went to the {:terminate, _}
case.
You can keep the handle_info
implementation the same as before.