Search code examples
pythonnetwork-programmingtwistedreactor

Stopping a Reactor kills All Reactors Running on Different Ports


I'm trying to implement this code, in which each "node" is a self-contained "actor".

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor, defer
from twisted.protocols.basic import LineReceiver


class ChatProtocol(LineReceiver):
    def __init__(self, factory):
        self.factory = factory
        self.name = None
        self.state = "REGISTER"

    def connectionMade(self):
        self.sendLine("What's your name?")

    def connectionLost(self, reason):
        if self.name in self.factory.users:
            del self.factory.users[self.name]
            self.broadcastMessage("{} has left the channel.".format(self.name))

    def lineReceived(self, line):
        if self.state == "REGISTER":
            self.handle_REGISTER(line)
        else:
            self.handle_CHAT(line)

    def handle_REGISTER(self, name):
        if name in self.factory.users:
            self.sendLine("Name taken, please choose another!")
            return
        self.sendLine("Welcome, {}".format(name))
        self.broadcastMessage("{} has joined the channel.".format(name))
        self.name = name
        self.factory.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "[%s]>> %s" % (self.name, message)
        self.broadcastMessage(message)

    def broadcastMessage(self, message):
        for name, protocol in self.factory.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)

class ChatFactory(Factory):
    """Handle all the nodes' connection"""
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return ChatProtocol(self)

class Node:
    def __init__(self, stop=None):
        self.Factory = ChatFactory
        self.reactor = reactor
        self.d = defer.Deferred()
        # with `stop` the node is bound to die
        if stop:
            self.reactor.callLater(stop, self.stop)

    def listen(self, port):
        self.reactor.listenTCP(port, self.Factory())

    def run(self):
        self.reactor.run()

    def stop(self):
        self.reactor.stop()

class Organization:
    """
    An organization consists of several nodes, with one node as a leader
    """
    def __init__(self):
        self.nodes = []

    def create_leader(self):
        # create first node now with intentionally kill the leader's reactor after 5 seconds
        leader_node = Node(5)
        leader_node.listen(8000)
        self.nodes.append(leader_node)

    def create_more_nodes(self):
        node_1 = Node()
        node_2 = Node()
        self.nodes.append(node_1)
        self.nodes.append(node_2)

    def activate(self):         
        self.nodes[1].listen(8001)
        self.nodes[2].listen(8002)

        """
        now leader_node listens at 8000
        node_1 listens at 8001
        node_2 listens at 8002
        """

        # run each node's reactor
        for n in self.nodes:
            n.run()

if __name__ == '__main__':
    org = Organization()
    org.create_leader()
    org.create_more_nodes()
    org.activate()

After 5 seconds, the leader_node's reactor is stopped via Node.stop() that gets deferred. However, I have no idea why node_1 and node_2 listening at 8001 and 8002 were also stopped. If anyone with more experience with Twisted could point this out that'd be great!


Solution

  • reactor.run() means "run the entire program". While it does not forcibly terminate (the function does return), it only does so in order to allow you to clean up some state before exiting. So you should only ever run one reactor per process and exit shortly after it is finished.

    If you want to have self-contained services which can shut down all of their incoming and outgoing connections as well as their listening ports, you have to track those connections in connectionMade and connectionLost. You also have to keep track of your listening port so that you can stopListening.

    As a side note, listenTCP is a very low-level API that you probably shouldn't be calling directly; instead, use the high-level Endpoints API, which is much more flexible.

    Here's a version of your code that tracks inbound and connections and listening ports and shuts them down as you'd like, while sharing a reactor between all the nodes.

    from twisted.internet.protocol import Factory
    from twisted.internet.endpoints import TCP4ServerEndpoint
    from twisted.protocols.basic import LineReceiver
    from twisted.internet.defer import Deferred
    
    class ChatProtocol(LineReceiver):
        def __init__(self, factory):
            self.factory = factory
            self.name = None
            self.state = "REGISTER"
    
        def connectionMade(self):
            self.factory.node.activeTransports.append(self.transport)
            self.sendLine("What's your name?")
    
        def connectionLost(self, reason):
            self.factory.node.activeTransports.remove(self.transport)
            if self.name in self.factory.users:
                del self.factory.users[self.name]
                self.broadcastMessage("{} has left the channel.".format(self.name))
    
        def lineReceived(self, line):
            if self.state == "REGISTER":
                self.handle_REGISTER(line)
            else:
                self.handle_CHAT(line)
    
        def handle_REGISTER(self, name):
            if name in self.factory.users:
                self.sendLine("Name taken, please choose another!")
                return
            self.sendLine("Welcome, {}".format(name))
            self.broadcastMessage("{} has joined the channel.".format(name))
            self.name = name
            self.factory.users[name] = self
            self.state = "CHAT"
    
        def handle_CHAT(self, message):
            message = "[%s]>> %s" % (self.name, message)
            self.broadcastMessage(message)
    
        def broadcastMessage(self, message):
            for name, protocol in self.factory.users.iteritems():
                if protocol != self:
                    protocol.sendLine(message)
    
    class ChatFactory(Factory):
        """Handle all the nodes' connection"""
        def __init__(self, node):
            self.users = {}
            self.node = node
    
        def buildProtocol(self, addr):
            return ChatProtocol(self)
    
    class Node:
        def __init__(self, endpoint, clock, stop=None):
            self.Factory = ChatFactory
            self._endpoint = endpoint
            self._listenStarting = None
            self._listeningPort = None
            self.activeTransports = []
            if stop is not None:
                print("Scheduling stop.", stop)
                clock.callLater(stop, self.stop)
    
        def listen(self):
            self._listenStarting = self._endpoint.listen(self.Factory(self))
            def setPort(port):
                self._listeningPort = port
            def clear(whatever):
                self._listenStarting = None
                return whatever
            self._listenStarting.addCallback(setPort).addBoth(clear)
    
        def stop(self):
            if self._listenStarting is not None:
                self._listenStarting.cancel()
            if self._listeningPort is not None:
                self._listeningPort.stopListening()
            for transport in self.activeTransports[:]:
                transport.abortConnection()
    
    class Organization:
        def __init__(self, reactor):
            self.reactor = reactor
            self.nodes = []
    
        def port(self, number):
            return TCP4ServerEndpoint(self.reactor, number)
    
        def create_leader(self):
            leader_node = Node(self.port(8000), self.reactor, 5)
            leader_node.listen()
            self.nodes.append(leader_node)
    
        def create_more_nodes(self):
            node_1 = Node(self.port(8001), self.reactor)
            node_2 = Node(self.port(8002), self.reactor)
            self.nodes.append(node_1)
            self.nodes.append(node_2)
    
        def activate(self):
            self.nodes[1].listen()
            self.nodes[2].listen()
    
    def main(reactor):
        org = Organization(reactor)
        org.create_leader()
        org.create_more_nodes()
        org.activate()
        return Deferred()
    
    if __name__ == '__main__':
        from twisted.internet.task import react
        react(main)