Search code examples
pythonconcurrencypython-asyncio

Running functions in parallel


I am simulating a CPU with a system clock. The way I currently have it set up with asyncio is the clock.run() function has an infinite loop and waits a period of time then fires an event then waits again and clears the event, this is like the clock going high and low pulsing.

Now the cpu will also be running infinitely and schedules functions to be ran when the clock next goes high, so the cpu execution will block until the clock goes high and the function is ran and returned.

The problem is that asyncio runs on a single main thread (I think) and so if its running the clock it cannot seem to also run the cpu in parallel.

Here's my current setup:

class Clock:
    def __init__(self, frequency: int):
        if not isinstance(frequency, int) or frequency <= 0:
            raise ValueError('Frequency must be a positive integer.')

        self.frequency = frequency
        self.period = 1 / frequency
        self.half_period = self.period / 2
        self._clock_pulse = asyncio.Event()

    async def run(self):
        while True:
            self._clock_pulse.set()
            print('high', self._clock_pulse.is_set())
            await asyncio.sleep(self.half_period)
            self._clock_pulse.clear()
            print('low', self._clock_pulse.is_set())
            await asyncio.sleep(self.half_period)

    async def schedule(self, func, *args):
        print('scheduled')
        await self._clock_pulse.wait()
        return await func(*args)


class CPU:
    def __init__(self, clock):
        self.clock = clock

    async def do_nothing(self, n):
        return n

    async def run(self):
        self.n = 0
        while True:
            value = await clock.schedule(self.do_nothing, self.n)
            print(value)
            self.n += 1


clock = Clock(1)
cpu = CPU(clock)


async def main():
    clock_task = asyncio.create_task(clock.run())
    cpu_task = asyncio.create_task(cpu.run())


asyncio.run(main())

So, I'm expecting the clock.run loop to be running continuously in parallel with the cpu.run. Maybe I could use Threading, but I don't know much about it? Thanks for any help!


Solution

  • If I understand your situation:

    1. You have a clock that is running sending out pulses periodically alternating between "high" and "low" pulses.
    2. You have loop in which you wish to schedule and await the completion of a task and the task is to start on the next "high" pulse. Depending on how long the task takes to complete it is possible that the clock has generated many high and low pulses between successive scheduling of the tasks. All that we can guarantee is that a task always starts running on a high pulse.

    Your current code Clock.run method does not seem to distinguish between high and low pulses. Instead of using an asyncio.Event to show that a pulse has occurred, I would suggest using an asyncio.Condition instance that represents the a high pulse being generated. The Clock.schedule function just needs to wait for the high-pulse condition to occur.

    Note that instead of defining the Clock.schedule method having func and args arguments, it is simpler to pass instead a coroutine argument. Also, you main function needs some modification (see below):

    import asyncio
    
    class Clock:
        def __init__(self, frequency: int):
            if not isinstance(frequency, int) or frequency <= 0:
                raise ValueError('Frequency must be a positive integer.')
    
            self.frequency = frequency
            self.period = 1 / frequency
            self.half_period = self.period / 2
            self._high_pulse_condition = asyncio.Condition()
    
        async def run(self):
            while True:
                async with self._high_pulse_condition:
                    self._high_pulse_condition.notify_all()  # high pulse event
                await asyncio.sleep(self.period)
    
        async def schedule(self, coro):
            async with self._high_pulse_condition:
                await self._high_pulse_condition.wait()
            return await coro
    
    
    class CPU:
        def __init__(self, clock):
            self.clock = clock
    
        async def do_nothing(self, n):
            return n
    
        async def run(self):
            import time
    
            n = 0
            while True:
                value = await self.clock.schedule(self.do_nothing(n))
                print(f'value = {value} at time = {time.time()}')
                n += 1
    
    async def main():
        clock = Clock(1)
        cpu = CPU(clock)
    
        await asyncio.gather(cpu.run(), clock.run())
    
    asyncio.run(main())
    

    Prints:

    value = 0 at time = 1710281657.515421
    value = 1 at time = 1710281658.5301206
    value = 2 at time = 1710281659.53623
    value = 3 at time = 1710281660.5377345
    value = 4 at time = 1710281661.5463734
    value = 5 at time = 1710281662.5613523
    value = 6 at time = 1710281663.5721672
    value = 7 at time = 1710281664.5855374
    value = 8 at time = 1710281665.5871134
    value = 9 at time = 1710281666.6020265
    value = 10 at time = 1710281667.6114671
    value = 11 at time = 1710281668.6124766
    value = 12 at time = 1710281669.6271718
    ...
    

    Update

    We can also more-or-less guarantee that a new task is scheduled on every high if instead of Clock.schedule returning a coroutine that CPU.run awaits before scheduling the next task we have Clock.schedule create a new task and CPU.run is not awaiting its completion before scheduling a new task on the next high pulse. The potential problem is that if the task being schedule on average takes longer than the time between successive high pulses, the number of tasks will grow without limit.

    import asyncio
    
    class Clock:
        def __init__(self, frequency: int):
            if not isinstance(frequency, int) or frequency <= 0:
                raise ValueError('Frequency must be a positive integer.')
    
            self.frequency = frequency
            self.period = 1 / frequency
            self.half_period = self.period / 2
            self._high_pulse_condition = asyncio.Condition()
            self._loop = asyncio.get_running_loop()
    
        async def run(self):
            while True:
                async with self._high_pulse_condition:
                    self._high_pulse_condition.notify_all()  # high pulse event
                await asyncio.sleep(self.period)
    
        async def schedule(self, coro):
            async with self._high_pulse_condition:
                await self._high_pulse_condition.wait()
            self._loop.create_task(coro)
    
    
    class CPU:
        def __init__(self, clock):
            self.clock = clock
    
        async def do_nothing(self, n):
            import time
    
            print(f'value = {n} at time = {time.time()}')
    
        async def run(self):
    
            n = 0
            while True:
                await self.clock.schedule(self.do_nothing(n))
                n += 1
    
    async def main():
        clock = Clock(1)
        cpu = CPU(clock)
    
        await asyncio.gather(cpu.run(), clock.run())
    
    asyncio.run(main())
    

    Prints:

    value = 0 at time = 1710281994.7425532
    value = 1 at time = 1710281995.752669
    value = 2 at time = 1710281996.767249
    value = 3 at time = 1710281997.7693186
    value = 4 at time = 1710281998.7833076
    value = 5 at time = 1710281999.7873156
    value = 6 at time = 1710282000.7989564
    ...