Have this code I expect to work on child doing the is_prime function for the given number_list and return the result, but I'm having issues to retrieve the result from the Task.await method.
defmodule LeaderWorker do
def start() do
children = [{Task.Supervisor,
name: Task.SomeThingSupervisor,
}]
Supervisor.start_link(children, strategy: :one_for_one)
end
def run(number_list, num_workers) do
results =
number_list
|> Task.async_stream(&check_prime/1, max_concurrency: num_workers)
|> Enum.map(&Task.await/1)
IO.inspect(results)
end
def check_prime(number) do
prime = is_prime(number)
IO.inspect({number, prime})
{number, prime}
end
def is_prime(number) when number <= 1 do
false
end
def is_prime(number) do
not Enum.any?(2..(div(number, 2)), &(&1 != 1 and rem(number, &1) == 0))
end
end
LeaderWorker.start()
number_list = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
LeaderWorker.run(number_list, 4)
This is the error I'm getting:
elixir .\clean.ex
{2, false}
{3, true}
{4, false}
{5, true}
** (FunctionClauseError) no function clause matching in Task.await/2
The following arguments were given to Task.await/2:
# 1
{:ok, {2, false}}
# 2
5000
Attempted function clauses (showing 1 out of 1):
def await(%Task{ref: ref, owner: owner} = task, timeout) when timeout == :infinity or is_integer(timeout) and timeout >= 0
(elixir 1.16.2) Task.await/2
(elixir 1.16.2) lib/enum.ex:1708: anonymous fn/3 in Enum.map/2
(elixir 1.16.2) lib/enum.ex:4396: anonymous fn/3 in Enum.map/2
(elixir 1.16.2) lib/task/supervised.ex:386: Task.Supervised.stream_deliver/7
(elixir 1.16.2) lib/enum.ex:4396: Enum.map/2
clean.ex:14: LeaderWorker.run/2
clean.ex:36: (file)
I'm trying to get a solution for that problem to get the {number, boolean} list that I should be having
As per a documentation for Task.async_stream/3,
When streamed, each task will emit
{:ok, value}
upon successful completion […]
That said, Task.await/1
has nothing to do with this code. The result of a call to Task.async_stream/3
would be a Stream
, and each element of this stream would hang up until the task is completed, and then return {:ok, result}
to the stream. Each task will be awaited until the result of its execution becomes available in the stream.
If you want to just spit out the results, consider using Stream.run/1
.
number_list
|> Task.async_stream(&check_prime/1, max_concurrency: num_workers)
|> Stream.each(&IO.inspect/1) # outputs each task result
|> Stream.run()
If you need the results themselves, collect them
number_list
|> Task.async_stream(&check_prime/1, max_concurrency: num_workers)
|> Stream.map(fn {:ok, {value, prime?}} -> {prime?, value} end)
|> Enum.to_list()
|> IO.inspect(label: "Results")