Search code examples
multithreadingconcurrencyprocesselixir

Can someone tell me what's wrong with mi Task.await?


Have this code I expect to work on child doing the is_prime function for the given number_list and return the result, but I'm having issues to retrieve the result from the Task.await method.

defmodule LeaderWorker do
  def start() do
    children = [{Task.Supervisor,
      name: Task.SomeThingSupervisor,
    }]

    Supervisor.start_link(children, strategy: :one_for_one)
  end

  def run(number_list, num_workers) do
    results =
      number_list
      |> Task.async_stream(&check_prime/1, max_concurrency: num_workers)
      |> Enum.map(&Task.await/1)

    IO.inspect(results)
  end

  def check_prime(number) do
    prime = is_prime(number)
    IO.inspect({number, prime})
    {number, prime}
  end

  def is_prime(number) when number <= 1 do
    false
  end

  def is_prime(number) do
    not Enum.any?(2..(div(number, 2)), &(&1 != 1 and rem(number, &1) == 0))
  end
end

LeaderWorker.start()
number_list = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
LeaderWorker.run(number_list, 4)

This is the error I'm getting:

 elixir .\clean.ex
{2, false}
{3, true}
{4, false}
{5, true}
** (FunctionClauseError) no function clause matching in Task.await/2    

    The following arguments were given to Task.await/2:

        # 1
        {:ok, {2, false}}

        # 2
        5000

    Attempted function clauses (showing 1 out of 1):

        def await(%Task{ref: ref, owner: owner} = task, timeout) when timeout == :infinity or is_integer(timeout) and timeout >= 0

    (elixir 1.16.2) Task.await/2
    (elixir 1.16.2) lib/enum.ex:1708: anonymous fn/3 in Enum.map/2
    (elixir 1.16.2) lib/enum.ex:4396: anonymous fn/3 in Enum.map/2
    (elixir 1.16.2) lib/task/supervised.ex:386: Task.Supervised.stream_deliver/7
    (elixir 1.16.2) lib/enum.ex:4396: Enum.map/2
    clean.ex:14: LeaderWorker.run/2
    clean.ex:36: (file)

I'm trying to get a solution for that problem to get the {number, boolean} list that I should be having


Solution

  • As per a documentation for Task.async_stream/3,

    When streamed, each task will emit {:ok, value} upon successful completion […]

    That said, Task.await/1 has nothing to do with this code. The result of a call to Task.async_stream/3 would be a Stream, and each element of this stream would hang up until the task is completed, and then return {:ok, result} to the stream. Each task will be awaited until the result of its execution becomes available in the stream.

    If you want to just spit out the results, consider using Stream.run/1.

      number_list
      |> Task.async_stream(&check_prime/1, max_concurrency: num_workers)
      |> Stream.each(&IO.inspect/1) # outputs each task result
      |> Stream.run()
    

    If you need the results themselves, collect them

      number_list
      |> Task.async_stream(&check_prime/1, max_concurrency: num_workers)
      |> Stream.map(fn {:ok, {value, prime?}} -> {prime?, value} end)
      |> Enum.to_list()
      |> IO.inspect(label: "Results")