Search code examples
pythonsocketssimultaneous

Receiving UDP message while processing a deque


In an attempt to solve this, I'm trying to simplify the problem. Suppose I have a receiver listening to both TCP and UDP messages. It will receive several strings, append them to a deque and after receiving "finish" message, it will start processing the deque.

If I receive a UDP message, I need to stop the processing, remove the last item of deque and then continue the processing.

from collections import deque

host = commands.getoutput("hostname -I")
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()


def read_tcp(s):
    conn, addr = s.accept()
    print('Connected with', *addr)
    while 1:
        data = conn.recv(BUFFER_SIZE)
        if not data: break
        print "received data:", data
        conn.send(data)  # echo
    conn.close()
    if (data == 'finish'):
        processP(q)
    else:
        q.append(data)

def read_udp(s):
    data,addr = s.recvfrom(1024)
    print("received message:", data)
    del q[-1]


processP(q):
    text = q.popleft()
    textReverse = text[::-1]
    print(textReverse)

def run():
    # create tcp socket
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        tcp.bind((host,port))
    except socket.error as err:
        print('Bind failed', err)
        return
    tcp.listen(1)
    # create udp socket
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    udp.bind((host,port))
    print('***Socket now listening at***:', host, port)
    input = [tcp,udp]
    try:
        while True:
            inputready,outputready,exceptready = select(input,[],[])
            for s in inputready:
                if s == tcp:
                    read_tcp(s)
                elif s == udp:
                    read_udp(s)
                else:
                    print("unknown socket:", s)
    # Hit Break / Ctrl-C to exit
    except KeyboardInterrupt:
        print('\nClosing')
        raise
    tcp.close()
    udp.close()

if __name__ == '__main__':
    run()

I have problem in pausing the program upon receiving a UDP message and then returning to the processing phase. Right now, if a UDP message is sent to my program while processing, it won't receive the message until the end of processing (and then the deque is empty). I thought maybe threading or multiprocessing may help, but I can't figure out how to apply them to the code.


Solution

  • Nobody forces you to empty out the dequeue. You can check if an UDP message has arrived before dequeueing the next workload. And that is as far as you can get with threads, as they do not allow you to interrupt arbitrary code. They can always only be terminated cooperatively.

    If your single item processing takes too long, then multiprocessing of work-items is an option, as you can kill an external process.

    Use select.select to check for incoming data on your sockets with a short timeout, before continuing to process the next workload. Alternatively you could use a thread waiting on input on the thread and manipulate the dequeue.

    EDIT This is your code made to work with python3, select.select and a timeout. Triggering read_udp works with netcat with echo foo | nc -4 -u localhost 5005 but then triggers an exception because you assume the existence of elements in the dequeue - which is an application logic problem that is independent of the question how to interleave listening and working.

    import socket
    import select
    from collections import deque
    
    host = "localhost"
    port = 5005
    backlog = 5
    BUFSIZE = 4096
    q = deque()
    
    
    def read_tcp(s):
        conn, addr = s.accept()
        print('Connected with', *addr)
        while 1:
            data = conn.recv(BUFFER_SIZE)
            if not data: break
            print("received data:", data)
            conn.send(data)  # echo
        conn.close()
        if (data == 'finish'):
            processP(q)
        else:
            q.append(data)
    
    def read_udp(s):
        data,addr = s.recvfrom(1024)
        print("received message:", data)
        del q[-1]
    
    
    def processP(q):
        text = q.popleft()
        textReverse = text[::-1]
        print(textReverse)
    
    def run():
        # create tcp socket
        tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            tcp.bind((host,port))
        except socket.error as err:
            print('Bind failed', err)
            return
        tcp.listen(1)
        # create udp socket
        udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
        udp.bind((host,port))
        print('***Socket now listening at***:', host, port)
        input = [tcp,udp]
        try:
            while True:
                print("select.select")
                inputready,outputready,exceptready = select.select(input,[],[], 0.1)
                for s in inputready:
                    if s == tcp:
                        read_tcp(s)
                    elif s == udp:
                        read_udp(s)
                    else:
                        print("unknown socket:", s)
        # Hit Break / Ctrl-C to exit
        except KeyboardInterrupt:
            print('\nClosing')
            raise
        tcp.close()
        udp.close()
    
    if __name__ == '__main__':
        run()