I need the following:
The following code uses a single Queue
and tests only the first dataset. I need to extend it for all datasets.
import signal
import numpy as np
import multiprocessing as mp
STOP = -1
data = {'x': np.random.rand(), 'y': np.random.rand(), 'z': np.random.rand()}
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def online_test(p2c_queue, c2p_queue, data_id, model, shared_stats):
print(f'testing function for {data_id} has started')
while True: # keep process alive for testing
print(f'... {data_id} waiting ...')
epoch = p2c_queue.get()
if epoch == STOP:
print(f'... testing {data_id} is over, function is ending ...')
break
shared_stats.update({data_id: {k: [] for k in ['prediction', 'error']}})
print(f'... {data_id} evaluation ...')
pred = model.value
err = pred - data[data_id] # simplified version, the real one takes some time
# shared_stats.update({data_id: {'prediction': pred, 'error': err}})
shared_stats.update({data_id: {'prediction': epoch, 'error': -epoch}}) # to debug if order of calls is correct
c2p_queue.put(True) # notify parent that testing is done for requested epoch
if __name__ == '__main__':
stats = {**{'epoch': []},
**{data_id: {k: [] for k in ['prediction', 'error']} for data_id in data.keys()}}
mp.set_start_method('spawn')
manager = mp.Manager()
p2c_queue = manager.Queue() # parent-to-child: parent tells child to start testing
c2p_queue = manager.Queue() # child-to-parent: child tells parent that testing is done
test_model = manager.Value('d', 10.0)
shared_stats = manager.dict()
pool = mp.Pool(initializer=initializer)
p2c_queue.put(0) # testing can start for raw model
pool.apply_async(online_test,
args=(p2c_queue, c2p_queue, 'x', test_model, shared_stats))
try: # wrap all in a try-except to handle KeyboardInterrupt
for epoch in range(10):
print('training epoch', epoch)
# ... here I do some training and then copy my parameters to test_model
test_model.value = np.random.rand() # simplified version
print('... waiting for testing before moving on to next epoch ...')
if c2p_queue.get(): # keep training only if previous eval is done
print(f'... epoch {epoch} testing is done, stats are')
for data_id in shared_stats.keys(): # but first copy stats here
for k in stats[data_id].keys():
mu = np.mean(shared_stats[data_id][k])
stats[data_id][k].append(mu)
print(' ', data_id, k, mu)
p2c_queue.put(epoch + 1)
p2c_queue.put(STOP)
print(stats)
except KeyboardInterrupt:
pool.terminate()
else:
pool.close()
pool.join()
'x'
. I tried:empty()
is unreliable.shared_stats
is accessed by all processes, possibly at the same time, but each sets only a specific key of the dictionary, so it should not be a problem. Right?I did it with JoinableQueue
, code is below.
However, this version does testing differently from how I originally planned. Here any process can be in charge of testing any dataset, while I would like to have one process testing always the same dataset. Any suggestion/comment is more than welcome.
import signal
import numpy as np
import multiprocessing as mp
STOP = 'STOP'
data = {'x': np.random.rand(), 'y': np.random.rand(), 'z': np.random.rand()}
debug_value = {'x': 1, 'y': 10, 'z': 100}
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def online_test(queue, model, shared_stats):
while True: # keep process alive for testing
epoch, data_id = queue.get()
if data_id == STOP:
print(f'... test function is stopping ...')
break
print(f'testing function for {data_id} has started for epoch {epoch}')
shared_stats.update({data_id: {k: [] for k in ['prediction', 'error']}})
# print(f'... evaluation ...')
pred = model.value
err = pred - data[data_id] # simplified version, the real one takes some time
checker = debug_value[data_id]
shared_stats.update({data_id: {'prediction': epoch * checker, 'error': - epoch * checker}}) # to debug if order of calls is correct
# shared_stats.update({data_id: {'prediction': pred, 'error': err}})
queue.task_done() # notify parent that testing is done for requested epoch
if __name__ == '__main__':
stats = {**{'epoch': []},
**{data_id: {k: [] for k in ['prediction', 'error']} for data_id in data.keys()}}
mp.set_start_method('spawn')
manager = mp.Manager()
test_queue = manager.JoinableQueue()
test_model = manager.Value('d', 10.0)
shared_stats = manager.dict()
pool = mp.Pool(initializer=initializer)
for data_id in data.keys():
pool.apply_async(online_test,
args=(test_queue, test_model, shared_stats))
test_queue.put((0, data_id)) # testing can start
try: # wrap all in a try-except to handle KeyboardInterrupt
for epoch in range(10):
print('training epoch', epoch)
# ... here I do some training and then copy my parameters to test_model
print('... waiting for testing before moving on to next epoch ...')
test_queue.join() # keep training only if previous eval is done
stats['epoch'].append(epoch + 1)
test_model.value = np.random.rand() # simplified version
print(f'... epoch {epoch} testing is done, stats are')
for data_id in shared_stats.keys(): # but first copy stats here
for k in stats[data_id].keys():
mu = np.mean(shared_stats[data_id][k])
stats[data_id][k].append(mu)
# print(' ', data_id, k, mu)
test_queue.put((epoch + 1, data_id))
for data_id in shared_stats.keys(): # notify all procs to end
test_queue.put((-1, STOP))
print(stats)
except KeyboardInterrupt:
pool.terminate()
else:
pool.close()
pool.join()