Search code examples
pythonpython-3.xasynchronouspython-multiprocessing

How to make repeated requests for tasks


I want to make repeated requests to a server that will return with some tasks. The response from the server will be a dictionary with a list of functions that need to be called. For example:

{ 
   tasks: [
      {
         function: "HelloWorld",
         id: 1212
      },
      {
         function: "GoodbyeWorld"
         id: 1222
      }
   ]
}

NOTE: I'm dummying it down.

For each of these tasks, I will run the specified function using multiprocessing. Here is an example of my code:

r = requests.get('https://localhost:5000', auth=('user', 'pass'))
data = r.json()

if len(data["tasks"]) > 0:
  manager = multiprocessing.Manager()
  for task in data["tasks"]:
    if task["function"] == "HelloWorld":
      helloObj = HelloWorldClass()
      hello = multiprocessing.Process(target=helloObj.helloWorld)
      hello.start()
      hello.join()
    elif task["function"] == "GoodbyeWorld":
      byeObj = GoodbyeWorldClass()
      bye = multiprocessing.Process(target=byeObj.byeWorld)
      bye.start()
      bye.join()

The problem is, I want to make repeated requests and fill the data["tasks"] array as the other processes are running. If I throw everything into some while loop, it'll only make a request after all the processes from the initial response is done (when join() has been reached for all processes).

Can anyone help me to make repeated requests and fill the array continuously? Please let me know if I need to make any clarifications.


Solution

  • If I understood you correctly, you need something like this:

    import time
    from multiprocessing import Process
    
    import requests
    
    from task import FunctionFactory
    
    
    def get_tasks():
        resp = requests.get('https://localhost:5000', auth=('user', 'pass'))
        data = resp.json()
    
        return data['tasks']
    
    
    if __name__ == '__main__':
        procs = {}
    
        for _ in range(10):
            tasks = get_tasks()
    
            if not tasks:
                time.sleep(5)
                continue
    
            for task in tasks:
    
                if task['id'] in procs:
                    # This task has been already submitted for execution.
                    continue
    
                func = FunctionFactory.build(task['function'])
    
                proc = Process(target=func)
                proc.start()
    
                procs[task['id']] = proc
    
        # Waiting for all the submitted tasks to finish.
        for proc in procs.values():
            proc.join()
    

    Here, the function get_tasks is used to request a list of dictionaries with id and function keys from the server. In the main section, there is a procs dictionary that maps id to running process instances which execute functions built by a FunctionFactory using received tasks' function names. In the case there is already a running task with the same id, it gets ignored.

    With this approach, you can request tasks as often as needed (here, 10 requests are used in a for loop) and start processes to execute them in parallel. In the end, you just wait for all the submitted tasks to finish.