Search code examples
pythontwistedproducer-consumer

Irregular Transmission Problem with Python Twisted Push Producer


I want to transmit data from a Queue using Twisted. I currently use a push producer to poll the queue for items and write to the transport.

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

The problem is, that the data are sent very irregularly and if only one item was in the queue, the data is never going to be sent. It seems that Twisted waits until the data to be transmitted has grown to a specific value until it transmits it. Is the way I implemented my producer the right way? Can I force Twisted to transmit data now?

I've also tried using a pull producer, but Twisted does not call the resumeProducing() method of it at all. Do I have to call the resumeProducer() method from outside, when using a pull producer?


Solution

  • It's hard to say why your producer doesn't work well without seeing a complete example (that is, without also seeing the code that registers it with a consumer and the code which is putting items into that queue).

    However, one problem you'll likely have is that if your queue is empty when resumeProducing is called, then you will write no bytes at all to the consumer. And when items are put into the queue, they'll sit there forever, because the consumer isn't going to call your resumeProducing method again.

    And this generalizes to any other case where the queue does not have enough data in it to cause the consumer to call pauseProducing on your producer. As a push producer, it is your job to continue to produce data on your own until the consumer calls pauseProducing (or stopProducing).

    For this particular case, that probably means that whenever you're going to put something in that queue - stop: check to see if the producer is not paused, and if it is not, write it to the consumer instead. Only put items in the queue when the producer is paused.