I have a python "Device" running in a docker container. It's connected to a Crossbar-router, receiving autobahn/WAMP event messages on subscribed channels.
When a certain event is published, my Device is calling a method that's finishing in a few seconds. Now, i want it to skip or handle any messages of the same event that are received, while the method is still running. I tried to accomplish this by using the @inlinecallback decorator of Twisted and setting a "self.busy"-flag on the Device.
But it's not returning with a deferred immediately, instead it's behaving like a normal, blocking method, so that incoming messages are processed one after another.
Here's my Code:
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
class Pixel(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
yield self.subscribe(self.handler_no_access, 'com.event.no_access')
@inlineCallbacks
def handler_no_access(self, direction):
entries = len(self.handlers['no_access'][direction])
if entries == 0:
self.handlers['no_access'][direction].append(direction)
result = yield self._handler_no_access()
return result
else:
yield print('handler_no_access: entries not 0: ', self.handlers['no_access'])
@inlineCallbacks
def _handler_no_access(self):
for direction in self.handlers['no_access']:
for message in self.handlers['no_access'][direction]:
yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5)
self.handlers['no_access'][direction] = []
I have already taken the hacky path with the self.handler dictionary, by the way.
EDIT
the blocking method is:
yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5)
It controls a Neopixel at the GPIOs of a RaspberryPi, letting it flash on and off for 1s. Any further calls to the method
def handler_no_access(self, direction)
while the _timed_switch hasn't finished, shall be skipped, so they don't stack up.
SOLUTION
@inlineCallbacks
def handler_no_access(self, direction):
direction = str(direction)
if self.busy[direction] is False:
self.busy[direction] = True
# non-blocking now
yield deferToThread(self._handler_no_access, direction)
else:
yield print('handler_no_access: direction {} busy '.format(direction))
def _handler_no_access(self, direction):
# this takes 1s to execute
self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5)
self.busy[direction] = False
inlineCallbacks
doesn't make blocking code into non-blocking code. It's just an alternate API for using Deferreds. Deferreds are just a way to manage callbacks.
You need to rewrite your blocking code to be non-blocking some other way. You haven't actually said which part of your code is blocking, nor what it is blocking on, so it's very difficult to suggest how you might do this. The only two general purpose tools for making blocking code into non-blocking are threads and processes. So, you could run the function in a separate thread or process. The function may or may not work in such an execution context (again, no way to know without knowing exactly what it does).