Search code examples
pythonpython-3.xnonblockingpython-asyncioperiodic-task

python: how-to do a periodic non-blocking lookup


Could you please advise on how to periodically execute a task that it takes more time to execute than the periodic interval ?

For example:

def lookup():
    # do some lookups, retrieve info, let's assume it takes 60sec to complete
    msg = {'abc':123}
    time.sleep(60)
    return msg

class Publisher(object):
    def __init__(self):
        self._TIMEIT = 0
        self._INTERVAL = 5
        self._counter = 0

    def xxx():
        t_start = time.time()
        msg = lookup()
        # do something with the value returned
        save_msg_to_db(msg)
        self._counter += 1
        t_end = time.time()
        self._TIMEIT = int(math.ceil(t_end - t_start))

    def run():
        while True:
            # let's do the lookup every 5sec, but remember that lookup takes 60sec to complete
            time.sleep(self._INTERVAL)
            # the call to xxx() should be non-blocking
            xxx()

But the run method is responsible for scheduling the periodic task, and as it iterates it should not block when calling the function xxx.

I am thinking to create an event loop on every call to xxx function, as described in A Bad Coroutine Example but how to do the call to xxx non-blocking ?

PS. I am using Python3.4 new to asyncio (was using gevent in the past), not sure if I am asking sth stupid here.

So lookup will create a async loop that will take let's say 60sec to complete. But, in the run method there is a endless loop running that i would like it to do the lookup every 5sec, in other words I would like to (1) how often i call the lookup function, independent of (2) how long it takes lookup to complete


Solution

  • Since your lookup() is primarily I/O intensive, you can run your xxx() method as a thread and be perfectly fine (code shortened for brevity):

    import threading
    import time
    
    class Publisher(object):
    
        def __init__(self):
            self._INTERVAL = 5
            self._counter = 0
            self._mutex = threading.Lock()
    
        def xxx(self):
            msg = lookup()
            save_msg_to_db(msg)
            with self._mutex:  # make sure only one thread is modifying counter at a given time
                self._counter += 1
    
        def run(self):
            while True:
                time.sleep(self._INTERVAL)
                t = threading.Thread(target=self.xxx)
                t.setDaemon(True)  # so we don't need to track/join threads
                t.start()  # start the thread, this is non-blocking