Search code examples
pythonsocketstcptwistedtwisted.internet

Twisted: how to read from a client socket after writing data to the same socket?


I am attempting to write a simple TCP server in twisted which has to perform the following operations in sequence:

  1. A client connects to the server and the KEEPALIVE flag for this connection is set to 1.
  2. The server receives data from the client.
  3. It then computes the response which is a list.
  4. The server then sends each item of the list one by one while waiting for explicit ACKs from the client in between, i.e., after sending a single item from the list, the server waits for an ACK packet from the client and only after receiving the ACK does it proceed to send the rest of the items in the same manner.

The following is the code:

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyProtocol(Protocol):

    def connectionMade(self):
         try:
             self.transport.setTcpKeepAlive(1)
         except AttributeError: 
             pass
         self.deferred = Deferred()
         self.deferred.addCallback(self.factory.service.compute_response)
         self.deferred.addCallback(self.send_response)

    def dataReceived(self, data):
         self.fire(data)

    def fire(self, data):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(data)

    def send_response(self, data):
        for item in data:
            d = Deferred()
            d.addCallback(self.transport.write)
            d.addCallback(self.wait_for_ack)
            d.callback(item)
        return   

    def wait_for_ack(self, dummy):
        try:
            self.transport.socket.recv(1024)
        except socket.error as e:
            print e
        return

Upon running the server and the client I get the following exception:

Resource temporarily unavailable

I understand the reason for this exception - I'm trying to call a blocking method on non blocking socket.

Please help me in finding a solution to this problem.


Solution

  • There are some problems with your example:

    1. You don't define compute_response anywhere (among other things) so I can't run your example. Consider making it an http://sscce.org
    2. You should never call either send or recv on a socket underlying a Twisted transport; let Twisted call those methods for you. In the case of recv it will deliver the results of recv to dataReceived.
    3. You can't rely upon dataReceived to receive whole messages; packets may always be arbitrarily fragmented in transit so you need to have a framing protocol for encapsulating your messages.

    However, since my other answer was so badly botched, I owe you a more thorough explanation of how to set up what you want to do.

    As stipulated in your question, your protocol is not completely defined enough to give an answer; you cannot do requests and responses with raw TCP fragments, because your application can't know where they start and end (see point 3 above). So, I've invented a little protocol to serve for this example: it's a line-delimited protocol where the client sends "request foo\n" and the server immediately sends "thinking...\n", computes a response, then sends "response foo\n" and waits for the client to send "ok"; in response, the server will either send the next "response ..." line, or a "done\n" line indicating that it's finished sending responses.

    With that as our protocol, I believe the key element of your question is that you cannot "wait for acknowledgement", or for that matter, anything else, in Twisted. What you need to do is implement something along the lines of "when an acknowledgement is received...".

    Therefore, when a message is received, we need to identify the type of the message: acknowledgement or request?

    1. if it's a request, we need to compute a response; when the response is finished being computed, we need to enqueue all the elements of the response and send the first one.
    2. if it's an acknowledgement, we need to examine the outgoing queue of responses, and if it has any contents, send the first element of it; otherwise, send "done".

    Here's a full, runnable example that implements the protocol I described in that way:

    from twisted.internet.protocol import ServerFactory
    from twisted.internet.task import deferLater
    from twisted.internet import reactor
    from twisted.internet.interfaces import ITCPTransport
    from twisted.protocols.basic import LineReceiver
    
    class MyProtocol(LineReceiver):
        delimiter = "\n"
        def connectionMade(self):
            if ITCPTransport.providedBy(self.transport):
                self.transport.setTcpKeepAlive(1)
            self.pendingResponses = []
    
        def lineReceived(self, line):
            split = line.rstrip("\r").split(None, 1)
            command = split[0]
            if command == b"request":
                # requesting a computed response
                payload = split[1]
                self.sendLine("thinking...")
                (self.factory.service.computeResponse(payload)
                 .addCallback(self.sendResponses))
            elif command == b"ok":
                # acknowledging a response; send the next response
                if self.pendingResponses:
                    self.sendOneResponse()
                else:
                    self.sendLine(b"done")
    
        def sendOneResponse(self):
            self.sendLine(b"response " + self.pendingResponses.pop(0))
    
        def sendResponses(self, listOfResponses):
            self.pendingResponses.extend(listOfResponses)
            self.sendOneResponse()
    
    class MyFactory(ServerFactory):
        protocol = MyProtocol
    
        def __init__(self, service):
            self.service = service
    
    class MyService(object):
        def computeResponse(self, request):
            return deferLater(
                reactor, 1.0,
                lambda: [request + b" 1", request + b" 2", request + b" 3"]
            )
    
    from twisted.internet.endpoints import StandardIOEndpoint
    endpoint = StandardIOEndpoint(reactor)
    
    endpoint.listen(MyFactory(MyService()))
    reactor.run()
    

    I've made this runnable on standard I/O so that you can just run it and type into it to get a feel how it works; if you want to run it on an actual network port, just substitute that with a different type of endpoint. Hopefully this answers your question.