Search code examples
pythonray

How do I wait for ray on Actor class?


I am developing Actor class and ray.wait() to collect the results.

Below is the code and console outputs which is collecting the result for only 2 Actors when there are 3 Actors.

import time
import ray


@ray.remote
class Tester:
    def __init__(self, param):
        self.param = param

    def run(self):
        return self.param

params = [0,1,2]
testers = []
for p in params:
    tester = Tester.remote(p)
    testers.append(tester)

runs = []
for i, tester in enumerate(testers):
    runs.append(tester.run.remote())

while len(runs):
    done_id, result_ids = ray.wait(runs)
    #runs size is not decreasing

    result = ray.get(done_id[0])
    print('result:{}'.format(result))
    time.sleep(1)
result:2
(pid=819202) 
(pid=819200) 
(pid=819198) 
result:1
result:0
result:0
result:0
result:0
result:0
...
...
...

The console is printing out forever because the runs variable's size is not reduced.

When I call ray.wait(runs) and get the done_id, runs's element with the done_id should be removed, but it is not removed.

I want the console output to be like below.

result:2
(pid=819202) 
(pid=819200) 
(pid=819198) 
result:1
result:0

Solution

  • The script you provided is using ray.wait incorrectly. The following code does what you want:

    import time
    import ray
    
    @ray.remote
    class Tester:
        def __init__(self, param):
            self.param = param
    
        def run(self):
            return self.param
    
    params = [0, 1, 2]
    
    # I use list comprehensions instead of for loops for terseness.
    testers = [Tester.remote(p) for p in params]
    not_done_ids = [tester.run.remote() for tester in testers]
    
    # len() is not required to check that the list is empty.
    while not_done_ids:
        
        # Replace not_done_ids with the list of object references that aren't
        # ready. Store the list of object references that are ready in done_ids.
        # timeout=1 means sleep at most 1 second, do not sleep if there are
        # new object references that are ready.
        done_ids, not_done_ids = ray.wait(not_done_ids, timeout=1)
        
        # ray.get can take an iterable of object references.
        done_return_values = ray.get(done_ids)
    
        # Process each result.
        for result in done_return_values:
            print(f'result: {result}')
    
    
    

    I added the following fixes:

    • ray.wait returns two lists, a list of objects that are ready, and a list of objects that may or may not be ready. You should iterate over the first list to get all object references that are ready.
    • Your while loop goes forever until the list is empty. I simply replaced the runs list with not_done_ids so that once all object references are ready, the while loop breaks.
    • ray.wait supports sleeping, with timeout. I removed your sleep and added timeout=1, which enables the program to run more efficiently (there is no sleep if another object is ready!).