I am developing Actor
class and ray.wait()
to collect the results.
Below is the code and console outputs which is collecting the result for only 2 Actors when there are 3 Actors.
import time
import ray
@ray.remote
class Tester:
def __init__(self, param):
self.param = param
def run(self):
return self.param
params = [0,1,2]
testers = []
for p in params:
tester = Tester.remote(p)
testers.append(tester)
runs = []
for i, tester in enumerate(testers):
runs.append(tester.run.remote())
while len(runs):
done_id, result_ids = ray.wait(runs)
#runs size is not decreasing
result = ray.get(done_id[0])
print('result:{}'.format(result))
time.sleep(1)
result:2
(pid=819202)
(pid=819200)
(pid=819198)
result:1
result:0
result:0
result:0
result:0
result:0
...
...
...
The console is printing out forever because the runs variable's size is not reduced.
When I call ray.wait(runs)
and get the done_id
, runs's element with the done_id
should be removed, but it is not removed.
I want the console output to be like below.
result:2
(pid=819202)
(pid=819200)
(pid=819198)
result:1
result:0
The script you provided is using ray.wait
incorrectly. The following code does what you want:
import time
import ray
@ray.remote
class Tester:
def __init__(self, param):
self.param = param
def run(self):
return self.param
params = [0, 1, 2]
# I use list comprehensions instead of for loops for terseness.
testers = [Tester.remote(p) for p in params]
not_done_ids = [tester.run.remote() for tester in testers]
# len() is not required to check that the list is empty.
while not_done_ids:
# Replace not_done_ids with the list of object references that aren't
# ready. Store the list of object references that are ready in done_ids.
# timeout=1 means sleep at most 1 second, do not sleep if there are
# new object references that are ready.
done_ids, not_done_ids = ray.wait(not_done_ids, timeout=1)
# ray.get can take an iterable of object references.
done_return_values = ray.get(done_ids)
# Process each result.
for result in done_return_values:
print(f'result: {result}')
I added the following fixes:
ray.wait
returns two lists, a list of objects that are ready, and a list of objects that may or may not be ready. You should iterate over the first list to get all object references that are ready.runs
list with not_done_ids
so that once all object references are ready, the while loop breaks.ray.wait
supports sleeping, with timeout. I removed your sleep and added timeout=1
, which enables the program to run more efficiently (there is no sleep if another object is ready!).