I'm playing a bit with python's asyncio library, in the wake of this example, I wrote the following scripts:
# file: get_rand.py
from random import choice
from time import sleep
import sys
def main():
sys.stderr.write('child: starting loop...\n')
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'))
sys.stderr.write('child: going to sleep\n')
sleep(0.5)
if __name__ == '__main__':
main()
and:
# file: async_test.py
import asyncio
import time
class Protocol(asyncio.SubprocessProtocol):
def __init__(self, exit_f):
self.exit = exit_f
print('Protocol initialised')
def pipe_data_received(self, fd, data):
print('Data received')
if fd == 1:
with open('rand_file.txt', 'a') as out:
out.write(bytes(data).decode('ascii'))
elif fd == 2:
print('Received error data!')
print(data)
def pipe_connection_lost(self, fd, exc):
print('Pipe connection lost')
if exc is not None:
print(exc)
raise exc
def process_exited(self):
self.exit.set_result(True)
print('Subprocess exited')
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
exit_future = asyncio.Future(loop=loop)
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
transp, proto = yield from subprocess
print('waiting for subprocess to finish...')
yield from exit_future
transp.close()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(mycoro())
loop.close()
When executing this code, I get the following:
$ python3.5 async_test.py
creating process...
Protocol initialised
waiting for subprocess to finish...
child: starting loop...
child: going to sleep
child: going to sleep
child: going to sleep
child: going to sleep
child: going to sleep
Data received
Pipe connection lost
Subprocess exited
I have many questions about all this:
transp, proto = yield from subprocess
the whole thing just hangs on creating process...
, so it looks like the child is not started until the parent does transp, proto = yield from subprocess
. Is that correct? Why is that?print
writes data to stdout buffers, by default they are flushed only once. You can add explicit flush
.
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'))
sys.stdout.flush()
or on ptyhon3.3 and above
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'), flush=True)
More info How to flush output of Python print?.
The subprocess_exec
returns coroutine. Every coroutine that you want to run must be scheduled on the loop. yield from
just schedules it and wait until it's done (for subprocess_exec
done means process is executed).
To run task in background, you have to as well schedule it on loop, but do not wait for results. You can use ensure_future`.
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
exit_future = asyncio.Future(loop=loop)
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
task = asyncio.ensure_future(subprocess)
print('Subprocess is handled in the background task')
# this function is called with run_until_complete,
# since that returning means complete we would not
# finish subprocess task
# so im leaving it
yield from exit_future
edit
And here simple example of running loop forever. I have removed all exit_future
related stuff, as it is not needed.
import asyncio
import time
class Protocol(asyncio.Protocol):
def __init__(self):
print('Protocol initialised')
def pipe_data_received(self, fd, data):
print('Data received %s' % data)
if fd == 1:
with open('rand_file.txt', 'a') as out:
out.write(bytes(data).decode('ascii'))
elif fd == 2:
print('Received error data!')
print(data)
def pipe_connection_lost(self, fd, exc):
print('Pipe connection lost')
if exc is not None:
print(exc)
raise exc
def process_exited(self):
print('Subprocess exited')
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
asyncio.ensure_future(subprocess)
asyncio.ensure_future(dummy_work())
print('Mycoro finished, tasks are scheduled')
@asyncio.coroutine
def dummy_work():
while True:
yield from asyncio.sleep(1)
print('dummy work')
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(mycoro())
loop.run_forever()
loop.close()
main()
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(mycoro())
loop.run_forever()
loop.close()
main()