I am using Klein, a micro web-framework based on twisted. I have a server (running on windows!), which will spawn an external long running process (end-to-end test) via reactor.spawnProcess(). To send status information about the running test, I implemented a ProcessProtocol:
class IPCProtocol(protocol.ProcessProtocol):
def __init__(self, status: 'Status', history: 'History'):
super().__init__()
self.status: Status = status
self.history: History = history
self.pid = None
def connectionMade(self):
self.pid = self.transport.pid
log.msg("process started, pid={}".format(self.pid))
def processExited(self, reason):
log.msg("process exited, status={}".format(reason.value.exitCode))
# add current run to history
self.history.add(self.status.current_run)
# create empty testrun and save status
self.status.current_run = Testrun()
self.status.status = StatusEnum.ready
self.status.save()
# check for more queue items
if not self.status.queue.is_empty():
start_testrun()
def outReceived(self, data: bytes):
data = data.decode('utf-8').strip()
if data.startswith(constants.LOG_PREFIX_FAILURE):
self.failureReceived()
if data.startswith(constants.LOG_PREFIX_SERVER):
data = data[len(constants.LOG_PREFIX_SERVER):]
log.msg("Testrunner: " + data)
self.serverMsgReceived(data)
I start the process with the following command:
ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)
To send information, I just print out messages (with a prefix to distinct them) in my testrunner.py.
The problem is that if I send the print commands to fast, then outReceived
will merge the messages.
I already tried adding a flush=True
for print()
calls in the external process, but this didn't fix the problem. Some other question suggested using usePTY=True
for the spawnProcess but this is not supported under windows.
Is there a better way to fix this, than adding a small delay (like time.sleep(0.1)
) to each print()
call?
You didn't say it, but it seems like the child process writes lines to its stdout. You need to parse the output to find the line boundaries if you want to operate on these lines.
You can use LineOnlyReceiver
to help you with this. Since processes aren't stream transports, you can't just use LineOnlyReceiver
directly. You have to adapt it to the process protocol interface. You can do this yourself or you can use ProcessEndpoint
(instead of spawnProcess
) to do it for you.
For example:
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import ProcessEndpoint
from twisted.internet import reactor
endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
...