Search code examples
pythonudptwisted

UDP client and server with Twisted Python


I want to create a server and client that sends and receives UDP packets from the network using Twisted. I've already written this with sockets in Python, but want to take advantage of Twisted's callback and threading features. However, I need help though with the design of Twisted.

I have multiple types of packets I want to receive, but let's pretend there is just one:

class Packet(object):
    def __init__(self, data=None):
        self.packet_type = 1
        self.payload = ''
        self.structure = '!H6s'
        if data == None:
            return

        self.packet_type, self.payload = struct.unpack(self.structure, data)

    def pack(self):
        return struct.pack(self.structure, self.packet_type, self.payload)

    def __str__(self):
        return "Type: {0}\nPayload {1}\n\n".format(self.packet_type, self.payload)

I made a protocol class (almost direct copy of the examples), which seems to work when I send data from another program:

class MyProtocol(DatagramProtocol):
    def datagramReceived(self, data, (host, port)):
        p = Packet(data)
        print p

reactor.listenUDP(3000, MyProtocol())
reactor.run()

What I don't know is how do I create a client which can send arbitrary packets on the network, which get picked up by the reactor:

# Something like this:
s = Sender()
p = Packet()
p.packet_type = 3
s.send(p.pack())
p.packet_type = 99
s.send(p.pack())

I also need to make sure to set the reuse address flag on the client and servers so I can run multiple instances of each at the same time on the same device (e.g. one script is sending heartbeats, another responds to heartbeats, etc).

Can someone show me how this could be done with Twisted?

Update:

This is how I do it with sockets in Python. I can run multiple listeners and senders at the same time and they all hear each other. How do I get this result with Twisted? (The listening portion need not be a separate process.)

class Listener(Process):
    def __init__(self, ip='127.0.0.1', port=3000):
        Process.__init__(self)
        self.ip = ip
        self.port = port

    def run(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((self.ip, self.port))

        data, from_ip = sock.recvfrom(4096)
        p = Packet(data)
        print p

class Sender(object):
    def __init__(self, ip='127.255.255.255', port=3000):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.ip = (ip, port)

    def send(self, data):
        self.sock.sendto(data, self.ip)

if __name__ == "__main__":
    l = Listener()
    l.start()
    s = Sender()
    p = Packet()
    p.packet_type = 4
    p.payload = 'jake'
    s.send(p.pack())

Working solution:

class MySender(DatagramProtocol):
    def __init__(self, packet, host='127.255.255.255', port=3000):
        self.packet = packet.pack()
        self.host = host
        self.port = port

    def startProtocol(self):
        self.transport.write(self.packet, (self.host, self.port))

if __name__ == "__main__":
    packet = Packet()
    packet.packet_type = 1
    packet.payload = 'jake'

    s = MySender(packet)

    reactor.listenMulticast(3000, MyProtocol(), listenMultiple=True)
    reactor.listenMulticast(3000, s, listenMultiple=True)
    reactor.callLater(4, reactor.stop)
    reactor.run()

Solution

  • Just like the server example above, there is a client example to. This should help you get started:

    Ok, here is a simple heart beat sender and receiver using datagram protocol.

    from twisted.internet.protocol import DatagramProtocol
    from twisted.internet import reactor
    from twisted.internet.task import LoopingCall
    import sys, time
    
    class HeartbeatSender(DatagramProtocol):
        def __init__(self, name, host, port):
            self.name = name
            self.loopObj = None
            self.host = host
            self.port = port
    
        def startProtocol(self):
            # Called when transport is connected
            # I am ready to send heart beats
            self.loopObj = LoopingCall(self.sendHeartBeat)
            self.loopObj.start(2, now=False)
    
        def stopProtocol(self):
            "Called after all transport is teared down"
            pass
    
        def datagramReceived(self, data, (host, port)):
            print "received %r from %s:%d" % (data, host, port)
    
    
        def sendHeartBeat(self):
            self.transport.write(self.name, (self.host, self.port))
    
    
    
    class HeartbeatReciever(DatagramProtocol):
        def __init__(self):
            pass
    
        def startProtocol(self):
            "Called when transport is connected"
            pass
    
        def stopProtocol(self):
            "Called after all transport is teared down"
    
    
        def datagramReceived(self, data, (host, port)):
            now = time.localtime(time.time())  
            timeStr = str(time.strftime("%y/%m/%d %H:%M:%S",now)) 
            print "received %r from %s:%d at %s" % (data, host, port, timeStr)
    
    
    
    heartBeatSenderObj = HeartbeatSender("sender", "127.0.0.1", 8005)
    
    reactor.listenMulticast(8005, HeartbeatReciever(), listenMultiple=True)
    reactor.listenMulticast(8005, heartBeatSenderObj, listenMultiple=True)
    reactor.run()
    

    The broadcast example simply modifies the above approach:

    from twisted.internet.protocol import DatagramProtocol
    from twisted.internet import reactor
    from twisted.internet.task import LoopingCall
    import sys, time
    
    class HeartbeatSender(DatagramProtocol):
        def __init__(self, name, host, port):
            self.name = name
            self.loopObj = None
            self.host = host
            self.port = port
    
        def startProtocol(self):
            # Called when transport is connected
            # I am ready to send heart beats
            self.transport.joinGroup('224.0.0.1')
            self.loopObj = LoopingCall(self.sendHeartBeat)
            self.loopObj.start(2, now=False)
    
        def stopProtocol(self):
            "Called after all transport is teared down"
            pass
    
        def datagramReceived(self, data, (host, port)):
            print "received %r from %s:%d" % (data, host, port)
    
    
        def sendHeartBeat(self):
            self.transport.write(self.name, (self.host, self.port))
    
    
    
    class HeartbeatReciever(DatagramProtocol):
        def __init__(self, name):
            self.name = name
    
        def startProtocol(self):
            "Called when transport is connected"
            self.transport.joinGroup('224.0.0.1')
            pass
    
        def stopProtocol(self):
            "Called after all transport is teared down"
    
    
        def datagramReceived(self, data, (host, port)):
            now = time.localtime(time.time())  
            timeStr = str(time.strftime("%y/%m/%d %H:%M:%S",now)) 
            print "%s received %r from %s:%d at %s" % (self.name, data, host, port, timeStr)
    
    
    
    heartBeatSenderObj = HeartbeatSender("sender", "224.0.0.1", 8005)
    
    reactor.listenMulticast(8005, HeartbeatReciever("listner1"), listenMultiple=True)
    reactor.listenMulticast(8005, HeartbeatReciever("listner2"), listenMultiple=True)
    reactor.listenMulticast(8005, heartBeatSenderObj, listenMultiple=True)
    reactor.run()