Stumped on this thread sync I'm encountering. Basically, I'm writing to a out buffer, and waiting on a condition variable until the read buffer is populated with a response from a socket. It's an incredibly simple thread sync.
def write_wait_response(self, buffer, timeout=30):
'''
Write and wait for response
Params:
Buffer BYTE encoded data
Timeout timeout to wait for response
Returns:
response str if successful
'''
self.buffer = buffer
if self.waitLock(timeout):
# condition var was signaled, we can return a response
readbuf = bytes(self.readbuffer)
self.readbuffer = b''
return readbuf
else:
print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
self.sa, timeout))
self.buffer = ''
raise TimeoutError("AsyncClientSocket Timed Out")
def handle_read(self):
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
print("notifying")
self.cond.notifyAll()
Seems straightforward enough, right? There's 1 thread waiting on the condition variable, and 1 thread (asyncore async callback loop) that will populate self.readbuffer and notify on the condition variable. Even more curious : if I do a time.sleep() instead of using a condition variable, I get a perfectly populated self.readbuffer on the calling thread of write_wait_response(). Obviously this is not a solution I can accept.
Here's what I'm expecting is happening:
Console output:
waiting <- thread 1 waiting on CV
AsyncClientSocket: writing 5 bytes <- thread 2: handle_write
b'200,2' <- thread 2: that's the server response
notifying <- thread 2: that's handle_read attempting to notify the held CV
error: uncaptured python exception, closing channel <my_socket_stuff.AsyncClientSocket connected 127.0.0.1:50000 at 0x1051bf438> (<class 'RuntimeError'>:cannot notify on un-acquired lock
Note: at the end of this log, thread 1 is STILL waiting on self.cond. What's going on?
Full class:
class AsyncClientSocket(asyncore.dispatcher):
def __init__(self, socketargs):
asyncore.dispatcher.__init__(self)
family, type, proto, canonname, sa = socketargs
self.sa = sa
self.create_socket(family, type)
if type == socket.SOCK_STREAM:
self.connect( sa )
elif type == socket.SOCK_DGRAM:
pass
self.buffer = b''
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.readbuffer = b''
def write_wait_response(self, buffer, timeout=30):
'''
Write and wait for response
Params:
Buffer BYTE encoded data
Timeout timeout to wait for response
Returns:
response str if successful
'''
self.buffer = buffer
if self.waitLock(timeout):
# condition var was signaled, we can return a response
readbuf = bytes(self.readbuffer)
self.readbuffer = b''
return readbuf
else:
print("AsyncClientSocket: No response recieved from {} in {} seconds, dumping buffer".format(
self.sa, timeout))
self.buffer = ''
raise TimeoutError("AsyncClientSocket Timed Out")
def waitLock(self, timeout):
'''
Wait for timeout seconds on CV
'''
try:
self.cond.acquire()
print("waiting")
return self.cond.wait(timeout)
finally:
self.cond.release()
def handle_connect(self):
pass
def handle_close(self):
self.close()
def handle_read(self):
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
print("notifying")
self.cond.notifyAll()
def writable(self):
return (len(self.buffer) > 0)
def handle_write(self):
print("AsyncClientSocket: writing {} bytes".format(len(self.buffer)))
self.readbuffer = b''
sent = self.sendto(self.buffer, self.sa)
self.buffer = self.buffer[sent:]
Figured it out. This isn't related to asyncore. I was just signaling the condition variable incorrectly. The python3 threading api doc says the calling thread of notify() must acquire the underlying lock which makes sense, wouldn't want two producers to notify on the same condition variable. Would want one to block on the critical section while the other performs its task.
def handle_read(self):
try:
self.cond.acquire()
self.readbuffer, address = self.recvfrom(2048)
print(self.readbuffer)
self.cond.notify()
finally:
self.cond.release()