I am attempting to write a simple TCP server in twisted which has to perform the following operations in sequence:
KEEPALIVE
flag for this connection is set to 1.The following is the code:
class MyFactory(ServerFactory):
protocol = MyProtocol
def __init__(self, service):
self.service = service
class MyProtocol(Protocol):
def connectionMade(self):
try:
self.transport.setTcpKeepAlive(1)
except AttributeError:
pass
self.deferred = Deferred()
self.deferred.addCallback(self.factory.service.compute_response)
self.deferred.addCallback(self.send_response)
def dataReceived(self, data):
self.fire(data)
def fire(self, data):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(data)
def send_response(self, data):
for item in data:
d = Deferred()
d.addCallback(self.transport.write)
d.addCallback(self.wait_for_ack)
d.callback(item)
return
def wait_for_ack(self, dummy):
try:
self.transport.socket.recv(1024)
except socket.error as e:
print e
return
Upon running the server and the client I get the following exception:
Resource temporarily unavailable
I understand the reason for this exception - I'm trying to call a blocking method on non blocking socket.
Please help me in finding a solution to this problem.
There are some problems with your example:
compute_response
anywhere (among other things) so I can't run your example. Consider making it an http://sscce.orgsend
or recv
on a socket underlying a Twisted transport; let Twisted call those methods for you. In the case of recv
it will deliver the results of recv
to dataReceived
.dataReceived
to receive whole messages; packets may always be arbitrarily fragmented in transit so you need to have a framing protocol for encapsulating your messages.However, since my other answer was so badly botched, I owe you a more thorough explanation of how to set up what you want to do.
As stipulated in your question, your protocol is not completely defined enough to give an answer; you cannot do requests and responses with raw TCP fragments, because your application can't know where they start and end (see point 3 above). So, I've invented a little protocol to serve for this example: it's a line-delimited protocol where the client sends "request foo\n"
and the server immediately sends "thinking...\n"
, computes a response, then sends "response foo\n"
and waits for the client to send "ok"
; in response, the server will either send the next "response ..."
line, or a "done\n"
line indicating that it's finished sending responses.
With that as our protocol, I believe the key element of your question is that you cannot "wait for acknowledgement", or for that matter, anything else, in Twisted. What you need to do is implement something along the lines of "when an acknowledgement is received...".
Therefore, when a message is received, we need to identify the type of the message: acknowledgement or request?
Here's a full, runnable example that implements the protocol I described in that way:
from twisted.internet.protocol import ServerFactory
from twisted.internet.task import deferLater
from twisted.internet import reactor
from twisted.internet.interfaces import ITCPTransport
from twisted.protocols.basic import LineReceiver
class MyProtocol(LineReceiver):
delimiter = "\n"
def connectionMade(self):
if ITCPTransport.providedBy(self.transport):
self.transport.setTcpKeepAlive(1)
self.pendingResponses = []
def lineReceived(self, line):
split = line.rstrip("\r").split(None, 1)
command = split[0]
if command == b"request":
# requesting a computed response
payload = split[1]
self.sendLine("thinking...")
(self.factory.service.computeResponse(payload)
.addCallback(self.sendResponses))
elif command == b"ok":
# acknowledging a response; send the next response
if self.pendingResponses:
self.sendOneResponse()
else:
self.sendLine(b"done")
def sendOneResponse(self):
self.sendLine(b"response " + self.pendingResponses.pop(0))
def sendResponses(self, listOfResponses):
self.pendingResponses.extend(listOfResponses)
self.sendOneResponse()
class MyFactory(ServerFactory):
protocol = MyProtocol
def __init__(self, service):
self.service = service
class MyService(object):
def computeResponse(self, request):
return deferLater(
reactor, 1.0,
lambda: [request + b" 1", request + b" 2", request + b" 3"]
)
from twisted.internet.endpoints import StandardIOEndpoint
endpoint = StandardIOEndpoint(reactor)
endpoint.listen(MyFactory(MyService()))
reactor.run()
I've made this runnable on standard I/O so that you can just run it and type into it to get a feel how it works; if you want to run it on an actual network port, just substitute that with a different type of endpoint. Hopefully this answers your question.