My class when is connected to the server should immediately send sign in string, afterwards when the session is over it should send out the sign out string and clean up the sockets. Below is my code.
import trio
class test:
_buffer = 8192
_max_retry = 4
def __init__(self, host='127.0.0.1', port=12345, usr='user', pwd='secret'):
self.host = str(host)
self.port = int(port)
self.usr = str(usr)
self.pwd = str(pwd)
self._nl = b'\r\n'
self._attempt = 0
self._queue = trio.Queue(30)
self._connected = trio.Event()
self._end_session = trio.Event()
@property
def connected(self):
return self._connected.is_set()
async def _sender(self, client_stream, nursery):
print('## sender: started!')
q = self._queue
while True:
cmd = await q.get()
print('## sending to the server:\n{!r}\n'.format(cmd))
if self._end_session.is_set():
nursery.cancel_scope.shield = True
with trio.move_on_after(1):
await client_stream.send_all(cmd)
nursery.cancel_scope.shield = False
await client_stream.send_all(cmd)
async def _receiver(self, client_stream, nursery):
print('## receiver: started!')
buff = self._buffer
while True:
data = await client_stream.receive_some(buff)
if not data:
print('## receiver: connection closed')
self._end_session.set()
break
print('## got data from the server:\n{!r}'.format(data))
async def _watchdog(self, nursery):
await self._end_session.wait()
await self._queue.put(self._logoff)
self._connected.clear()
nursery.cancel_scope.cancel()
@property
def _login(self, *a, **kw):
nl = self._nl
usr, pwd = self.usr, self.pwd
return nl.join(x.encode() for x in ['Login', usr,pwd]) + 2*nl
@property
def _logoff(self, *a, **kw):
nl = self._nl
return nl.join(x.encode() for x in ['Logoff']) + 2*nl
async def _connect(self):
host, port = self.host, self.port
print('## connecting to {}:{}'.format(host, port))
try:
client_stream = await trio.open_tcp_stream(host, port)
except OSError as err:
print('##', err)
else:
async with client_stream:
self._end_session.clear()
self._connected.set()
self._attempt = 0
# Sign in as soon as connected
await self._queue.put(self._login)
async with trio.open_nursery() as nursery:
print("## spawning watchdog...")
nursery.start_soon(self._watchdog, nursery)
print("## spawning sender...")
nursery.start_soon(self._sender, client_stream, nursery)
print("## spawning receiver...")
nursery.start_soon(self._receiver, client_stream, nursery)
def connect(self):
while self._attempt <= self._max_retry:
try:
trio.run(self._connect)
trio.run(trio.sleep, 1)
self._attempt += 1
except KeyboardInterrupt:
self._end_session.set()
print('Bye bye...')
break
tst = test()
tst.connect()
My logic doesn't quite work. Well it works if I kill the netcat
listener, so then my session looks like the following:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'
## receiver: connection closed
## sending to the server:
b'Logoff\r\n\r\n'
Note that Logoff
string has been sent out, although it doesn't make sense in here as connection is already broken by that time.
However my goal is to Logoff
when user KeyboardInterrupt
. In this case my session looks similar to this:
## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'
Bye bye...
Note that Logoff
hasn't been sent off.
Any ideas?
Here your call tree looks something like:
connect
|
+- _connect*
|
+- _watchdog*
|
+- _sender*
|
+- _receiver*
The *
s indicate the 4 trio tasks. The _connect
task is sitting at the end of the nursery block, waiting for the child tasks to complete. The _watchdog
task is blocked in await self._end_session.wait()
, the _sender
task is blocked in await q.get()
, and the _receiver
task is blocked in await client_stream.receive_some(...)
.
When you hit control-C, then the standard Python semantics are that whatever bit of Python code is running suddenly raises KeyboardInterrupt
. In this case, you have 4 different tasks running, so one of those blocked operations gets picked at random [1], and raises a KeyboardInterrupt
. This means a few different things might happen:
If _watchdog
's wait
call raises KeyboardInterrupt
, then the _watchdog
method immediately exits, so it never even tries to send logout
. Then as part of unwinding the stack, trio cancels all the other tasks, and once they've exited then the KeyboardInterrupt
keeps propagating up until it reaches your finally
block in connect
. At this point you try to notify the watchdog task using self._end_session.set()
, but it's not running anymore, so it doesn't notice.
If _sender
's q.get()
call raises KeyboardInterrupt
, then the _sender
method immediately exits, so even if the _watchdog
did ask it to send a logoff message, it won't be there to notice. And in any case, trio then proceeds to cancel the watchdog and receiver tasks anyway, and things proceed as above.
If _receiver
's receive_all
call raises KeyboardInterrupt
... same thing happens.
Minor subtlety: _connect
can also receive the KeyboardInterrupt
, which does the same thing: cancels all the children, and then waits for them to stop before allowing the KeyboardInterrupt
to keep propagating.
If you want to reliably catch control-C and then do something with it, then this business of it being raised at some random point is quite a nuisance. The simplest way to do this is to use Trio's support for catching signals to catch the signal.SIGINT
signal, which is the thing that Python normally converts into a KeyboardInterrupt
. (The "INT" stands for "interrupt".) Something like:
async def _control_c_watcher(self):
# This API is currently a little cumbersome, sorry, see
# https://github.com/python-trio/trio/issues/354
with trio.catch_signals({signal.SIGINT}) as batched_signal_aiter:
async for _ in batched_signal_aiter:
self._end_session.set()
# We exit the loop, restoring the normal behavior of
# control-C. This way hitting control-C once will try to
# do a polite shutdown, but if that gets stuck the user
# can hit control-C again to raise KeyboardInterrupt and
# force things to exit.
break
and then start this running alongside your other tasks.
You also have the problem that in your _watchdog
method, it puts the logoff
request into the queue – thus scheduling a message to be sent later, by the _sender
task – and then immediately cancels all the tasks, so that the _sender
task probably won't get a chance to see the message and react to it! In general, I find my code works nicer when I use tasks only when necessary. Instead of having a sender task and then putting messages in a queue when you want to send them, why not have the code that wants to send a message call stream.send_all
directly? The one thing you have to watch out for is if you have multiple tasks that might send things simultaneously, you might want to use a trio.Lock()
to make sure they don't bump into each other by calling send_all
at the same time:
async def send_all(self, data):
async with self.send_lock:
await self.send_stream.send_all(data)
async def do_logoff(self):
# First send the message
await self.send_all(b"Logoff\r\n\r\n")
# And then, *after* the message has been sent, cancel the tasks
self.nursery.cancel()
If you do it this way, you might be able to get rid of the watchdog task and the _end_session
event entirely.
A few other notes about your code while I'm here:
Calling trio.run
multiple times like this is unusual. The normal style is to call it once at the top of your program, and put all your real code inside it. Once you exit trio.run
, all of trio's state is lost, you're definitely not running any concurrent tasks (so there's no way anything could possibly be listening and notice your call to _end_session.set()
!). And in general, almost all Trio functions assume that you're already inside a call to trio.run
. It turns out that right now you can call trio.Queue()
before starting trio without getting an exception, but that's basically just a coincidence.
The use of shielding inside _sender
looks odd to me. Shielding is generally an advanced feature that you almost never want to use, and I don't think this is an exception.
Hope that helps! And if you want to talk more about style/design issues like this but are worried they might be too vague for stack overflow ("is this program designed well?"), then feel free to drop by the trio chat channel.
[1] Well, actually trio probably picks the main task for various reasons, but that's not guaranteed and in any case it doesn't make a difference here.