Search code examples
pythonpython-multiprocessing

Synchronization of multiple processes running with 'while True' loop


I need the following:

  • Main function trains a model.
  • At every epoch, its parameters are copied to a test model.
  • The test model is used for testing on multiple datasets.
  • Testing must happen in parallel while training continue for the next epoch.
  • Wait for testing to be done on all datasets before moving on to the next training epoch.
  • Testing function reports some stats, that are read by the main function.

The following code uses a single Queue and tests only the first dataset. I need to extend it for all datasets.

import signal
import numpy as np
import multiprocessing as mp

STOP = -1

data = {'x': np.random.rand(), 'y': np.random.rand(), 'z': np.random.rand()}

def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def online_test(p2c_queue, c2p_queue, data_id, model, shared_stats):
    print(f'testing function for {data_id} has started')
    while True: # keep process alive for testing
        print(f'... {data_id} waiting ...')
        epoch = p2c_queue.get()
        if epoch == STOP:
            print(f'... testing {data_id} is over, function is ending ...')
            break
        shared_stats.update({data_id: {k: [] for k in ['prediction', 'error']}})
        print(f'... {data_id} evaluation ...')
        pred = model.value
        err = pred - data[data_id] # simplified version, the real one takes some time
        # shared_stats.update({data_id: {'prediction': pred, 'error': err}})
        shared_stats.update({data_id: {'prediction': epoch, 'error': -epoch}}) # to debug if order of calls is correct
        c2p_queue.put(True) # notify parent that testing is done for requested epoch


if __name__ == '__main__':
    stats = {**{'epoch': []},
             **{data_id: {k: [] for k in ['prediction', 'error']} for data_id in data.keys()}}
    mp.set_start_method('spawn')
    manager = mp.Manager()
    p2c_queue = manager.Queue() # parent-to-child: parent tells child to start testing
    c2p_queue = manager.Queue() # child-to-parent: child tells parent that testing is done
    test_model = manager.Value('d', 10.0)
    shared_stats = manager.dict()
    pool = mp.Pool(initializer=initializer)
    p2c_queue.put(0) # testing can start for raw model
    pool.apply_async(online_test,
        args=(p2c_queue, c2p_queue, 'x', test_model, shared_stats))

    try: # wrap all in a try-except to handle KeyboardInterrupt
        for epoch in range(10):
            print('training epoch', epoch)
            # ... here I do some training and then copy my parameters to test_model
            test_model.value = np.random.rand() # simplified version
            print('... waiting for testing before moving on to next epoch ...')
            if c2p_queue.get(): # keep training only if previous eval is done
                print(f'... epoch {epoch} testing is done, stats are')
                for data_id in shared_stats.keys(): # but first copy stats here
                    for k in stats[data_id].keys():
                        mu = np.mean(shared_stats[data_id][k])
                        stats[data_id][k].append(mu)
                        print('  ', data_id, k, mu)
                p2c_queue.put(epoch + 1)

        p2c_queue.put(STOP)
        print(stats)

    except KeyboardInterrupt:
        pool.terminate()
    else:
        pool.close()

    pool.join()
  1. How to I synch multiple processes? The example here spawns only one for data 'x'. I tried:
  • Using multiple queues but my code hangs. Each queue is supposed to have only one item corresponding to the testing dataset.
  • Using one queue. The queue is supposed to have as many items as the number of testing datasets. The idea is to check when the queue is empty, but I have read that empty() is unreliable.
  1. Do I need a lock? shared_stats is accessed by all processes, possibly at the same time, but each sets only a specific key of the dictionary, so it should not be a problem. Right?

Solution

  • I did it with JoinableQueue, code is below.

    However, this version does testing differently from how I originally planned. Here any process can be in charge of testing any dataset, while I would like to have one process testing always the same dataset. Any suggestion/comment is more than welcome.

    import signal
    import numpy as np
    import multiprocessing as mp
    
    STOP = 'STOP'
    
    data = {'x': np.random.rand(), 'y': np.random.rand(), 'z': np.random.rand()}
    debug_value = {'x': 1, 'y': 10, 'z': 100}
    
    def initializer():
        """Ignore CTRL+C in the worker process."""
        signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    
    def online_test(queue, model, shared_stats):
        while True: # keep process alive for testing
            epoch, data_id = queue.get()
            if data_id == STOP:
                print(f'... test function is stopping ...')
                break
            print(f'testing function for {data_id} has started for epoch {epoch}')
            shared_stats.update({data_id: {k: [] for k in ['prediction', 'error']}})
            # print(f'... evaluation ...')
            pred = model.value
            err = pred - data[data_id] # simplified version, the real one takes some time
            checker = debug_value[data_id]
            shared_stats.update({data_id: {'prediction': epoch * checker, 'error': - epoch * checker}}) # to debug if order of calls is correct
            # shared_stats.update({data_id: {'prediction': pred, 'error': err}})
            queue.task_done() # notify parent that testing is done for requested epoch
    
    
    if __name__ == '__main__':
        stats = {**{'epoch': []},
                 **{data_id: {k: [] for k in ['prediction', 'error']} for data_id in data.keys()}}
        mp.set_start_method('spawn')
        manager = mp.Manager()
        test_queue = manager.JoinableQueue()
        test_model = manager.Value('d', 10.0)
        shared_stats = manager.dict()
        pool = mp.Pool(initializer=initializer)
        for data_id in data.keys():
            pool.apply_async(online_test,
                args=(test_queue, test_model, shared_stats))
            test_queue.put((0, data_id)) # testing can start
    
        try: # wrap all in a try-except to handle KeyboardInterrupt
            for epoch in range(10):
                print('training epoch', epoch)
                # ... here I do some training and then copy my parameters to test_model
                print('... waiting for testing before moving on to next epoch ...')
                test_queue.join() # keep training only if previous eval is done
                stats['epoch'].append(epoch + 1)
                test_model.value = np.random.rand() # simplified version
                print(f'... epoch {epoch} testing is done, stats are')
                for data_id in shared_stats.keys(): # but first copy stats here
                    for k in stats[data_id].keys():
                        mu = np.mean(shared_stats[data_id][k])
                        stats[data_id][k].append(mu)
                        # print('  ', data_id, k, mu)
                    test_queue.put((epoch + 1, data_id))
    
            for data_id in shared_stats.keys(): # notify all procs to end
                test_queue.put((-1, STOP))
    
            print(stats)
    
        except KeyboardInterrupt:
            pool.terminate()
        else:
            pool.close()
    
        pool.join()