Search code examples
pythonpython-3.xmultiprocessingpython-multithreading

When running two functions simultaneously how to return the first result and use it for further processes


So I have two webscrapers that collect data from two different sources. I am running them both simultaneously to collect a specific piece of data (e.g. covid numbers). When one of the functions finds data I want to use that data without waiting for the other one to finish.

So far I tried the multiprocessing - pool module and to return the results with get() but by definition I have to wait for both get() to finish before I can continue with my code. My goal is to have the code as simple and as short as possible.

My webscraper functions can be run with arguments and return a result if found. It is also possible to modify them.

The code I have so far which waits for both get() to finish.

from multiprocessing import Pool
from scraper1 import main_1
from scraper2 import main_2
from twitter import post_tweet

if __name__ == '__main__':
    with Pool(processes=2) as pool:
        r1 = pool.apply_async(main_1, ('www.website1.com','June'))
        r2 = pool.apply_async(main_2, ())
        
        data = r1.get()
        data2 = r2.get()

    post_tweet("New data is {}".format(data))
    post_tweet("New data is {}".format(data2))

From here I have seen that threading might be a better option since webscraping involves a lot of waiting and only little parsing but I am not sure how I would implement this.

I think the solution is fairly easy but I have been searching and trying different things all day without much success so I think I will just ask here. (I only started programming 2 months ago)


Solution

  • As always there are many ways to accomplish this task.

    you have already mentioned using a Queue:

    from multiprocessing import Process, Queue
    from scraper1 import main_1
    from scraper2 import main_2
    
    def simple_worker(target, args, ret_q):
        ret_q.put(target(*args)) # mp.Queue has it's own mutex so we don't need to worry about concurrent read/write
        
    if __name__ == "__main__":
        q = Queue()
        p1 = Process(target=simple_worker, args=(main_1, ('www.website1.com','June'), q))
        p2 = Process(target=simple_worker, args=(main_2, ('www.website2.com','July'), q))
        p1.start()
        p2.start()
        first_result = q.get()
        do_stuff(first_result)
        #don't forget to get() the second result before you quit. It's not a good idea to 
        #leave things in a Queue and just assume it will be properly cleaned up at exit.
        second_result = q.get()
        p1.join()
        p2.join()
    

    You could also still use a Pool by using imap_unordered and just taking the first result:

    from multiprocessing import Pool
    from scraper1 import main_1
    from scraper2 import main_2
    
    def simple_worker2(args):
        target, arglist = args #unpack args
        return target(*arglist)
        
    if __name__ == "__main__":
        tasks = ((main_1, ('www.website1.com','June')),
                 (main_2, ('www.website2.com','July')))
        with Pool() as p:  #Pool context manager handles worker cleanup (your target function may however be interrupted at any point if the pool exits before a task is complete
            for result in p.imap_unordered(simple_worker2, tasks, chunksize=1):
                do_stuff(result)
                break #don't bother with further results