I want to transmit data from a Queue using Twisted. I currently use a push producer to poll the queue for items and write to the transport.
class Producer:
implements(interfaces.IPushProducer)
def __init__(self, protocol, queue):
self.queue = queue
self.protocol = protocol
def resumeProducing(self):
self.paused = False
while not self.paused:
try:
data = self.queue.get_nowait()
logger.debug("Transmitting: '%s'", repr(data))
data = cPickle.dumps(data)
self.protocol.transport.write(data + "\r\n")
except Empty:
pass
def pauseProducing(self):
logger.debug("Transmitter paused.")
self.paused = True
def stopProducing(self):
pass
The problem is, that the data are sent very irregularly and if only one item was in the queue, the data is never going to be sent. It seems that Twisted waits until the data to be transmitted has grown to a specific value until it transmits it. Is the way I implemented my producer the right way? Can I force Twisted to transmit data now?
I've also tried using a pull producer, but Twisted does not call the resumeProducing()
method of it at all. Do I have to call the resumeProducer()
method from outside, when using a pull producer?
It's hard to say why your producer doesn't work well without seeing a complete example (that is, without also seeing the code that registers it with a consumer and the code which is putting items into that queue).
However, one problem you'll likely have is that if your queue is empty when resumeProducing
is called, then you will write no bytes at all to the consumer. And when items are put into the queue, they'll sit there forever, because the consumer isn't going to call your resumeProducing
method again.
And this generalizes to any other case where the queue does not have enough data in it to cause the consumer to call pauseProducing
on your producer. As a push producer, it is your job to continue to produce data on your own until the consumer calls pauseProducing
(or stopProducing
).
For this particular case, that probably means that whenever you're going to put something in that queue - stop: check to see if the producer is not paused, and if it is not, write it to the consumer instead. Only put items in the queue when the producer is paused.