Search code examples
pythonpython-3.xwindowsipctwisted

outReceived from twisted ProcessProtocol merges messages if received too fast (buffering problem?)


I am using Klein, a micro web-framework based on twisted. I have a server (running on windows!), which will spawn an external long running process (end-to-end test) via reactor.spawnProcess(). To send status information about the running test, I implemented a ProcessProtocol:

class IPCProtocol(protocol.ProcessProtocol):
    def __init__(self, status: 'Status', history: 'History'):
        super().__init__()
        self.status: Status = status
        self.history: History = history
        self.pid = None

    def connectionMade(self):
        self.pid = self.transport.pid
        log.msg("process started, pid={}".format(self.pid))

    def processExited(self, reason):
        log.msg("process exited, status={}".format(reason.value.exitCode))
        # add current run to history
        self.history.add(self.status.current_run)
        # create empty testrun and save status
        self.status.current_run = Testrun()
        self.status.status = StatusEnum.ready
        self.status.save()
        # check for more queue items
        if not self.status.queue.is_empty():
            start_testrun()

    def outReceived(self, data: bytes):
        data = data.decode('utf-8').strip()
        if data.startswith(constants.LOG_PREFIX_FAILURE):
            self.failureReceived()
        if data.startswith(constants.LOG_PREFIX_SERVER):
            data = data[len(constants.LOG_PREFIX_SERVER):]
            log.msg("Testrunner: " + data)
            self.serverMsgReceived(data)

I start the process with the following command:

ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)

To send information, I just print out messages (with a prefix to distinct them) in my testrunner.py.

The problem is that if I send the print commands to fast, then outReceived will merge the messages.

I already tried adding a flush=True for print() calls in the external process, but this didn't fix the problem. Some other question suggested using usePTY=True for the spawnProcess but this is not supported under windows. Is there a better way to fix this, than adding a small delay (like time.sleep(0.1)) to each print()call?


Solution

  • You didn't say it, but it seems like the child process writes lines to its stdout. You need to parse the output to find the line boundaries if you want to operate on these lines.

    You can use LineOnlyReceiver to help you with this. Since processes aren't stream transports, you can't just use LineOnlyReceiver directly. You have to adapt it to the process protocol interface. You can do this yourself or you can use ProcessEndpoint (instead of spawnProcess) to do it for you.

    For example:

    from twisted.protocols.basic import LineOnlyReceiver
    from twisted.internet.protocol import Factory
    from twisted.internet.endpoints import ProcessEndpoint
    from twisted.internet import reactor
    
    endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
    spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
    ...