Search code examples
pythoniomultiprocessingmutex

Python multiprocessing safely writing to a file


I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.

I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?


Solution

  • @GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

    import multiprocessing as mp
    import time
    
    fn = 'c:/temp/temp.txt'
    
    def worker(arg, q):
        '''stupidly simulates long running process'''
        start = time.clock()
        s = 'this is a test'
        txt = s
        for i in range(200000):
            txt += s 
        done = time.clock() - start
        with open(fn, 'rb') as f:
            size = len(f.read())
        res = 'Process' + str(arg), str(size), done
        q.put(res)
        return res
    
    def listener(q):
        '''listens for messages on the q, writes to file. '''
    
        with open(fn, 'w') as f:
            while 1:
                m = q.get()
                if m == 'kill':
                    f.write('killed')
                    break
                f.write(str(m) + '\n')
                f.flush()
    
    def main():
        #must use Manager queue here, or will not work
        manager = mp.Manager()
        q = manager.Queue()    
        pool = mp.Pool(mp.cpu_count() + 2)
    
        #put listener to work first
        watcher = pool.apply_async(listener, (q,))
    
        #fire off workers
        jobs = []
        for i in range(80):
            job = pool.apply_async(worker, (i, q))
            jobs.append(job)
    
        # collect results from the workers through the pool result queue
        for job in jobs: 
            job.get()
    
        #now we are done, kill the listener
        q.put('kill')
        pool.close()
        pool.join()
    
    if __name__ == "__main__":
       main()