Search code examples
parallel-processingjuliainterruptdistributed-computingpmap

Distributed Julia: parallel map (pmap) with a timeout / time limit for each map task to complete


My project involves computing in parallel a map using Julia's Distributed's pmap function.

Mapping a given element could take a few seconds, or it could take essentially forever. I want a timeout or time limit for an individual map task/computation to complete.

If a map task finishes in time, great, return the result of the computation. If the task doesn't complete by the time limit, stop computation when the time limit has been reached, and return some value or message indicating a timeout occurred.

A minimal example follows. First are imported modules, and then worker processes are launched:

num_procs = 1
using Distributed
if num_procs > 1
    # The main process (no calling addprocs) can be used for `pmap`:
    addprocs(num_procs-1)
end

Next, the mapping task is defined for all the worker processes. The mapping task should timeout after 1 second:

@everywhere import Random
@everywhere begin
    """
    Compute stuff for `wait_time` seconds, and return `wait_time`.
    If `timeout` seconds elapses, stop computation and return something else.
    """
    function waitForTimeUnlessTimeout(wait_time, timeout=1)

        # < Insert some sort of timeout code? >

        # This block of code simulates a long computation.
        # (pretend the computation time is unknown)
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
        end

        # computation completed before time limit. Return wait_time.
        round(wait_time, digits=2)
    end
end

The function that executes the parallel map (pmap) is defined on the main process. Each map task randomly takes up to 2 seconds to complete, but should time out after 1 second.

function myParallelMapping(num_tasks = 20, max_runtime=2)    
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # return the parallel computation of the mapping tasks
    pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end

print(myParallelMapping())

How should this time-limited parallel map be implemented?


Solution

  • You could put something like this inside your pmap body

    pmap(runtimes) do runtime
      t0 = time()
      task = @async waitForTimeUnlessTimeout(runtime)
      while !istaskdone(task) && time()-t0 < time_limit
          sleep(1)
      end
      istaskdone(task) && (return fetch(task))
      error("time over")
    end
    

    Also note that (runtime)->waitForTimeUnlessTimeout(runtime) is the same as just waitForTimeUnlessTimeout .