In an attempt to solve this, I'm trying to simplify the problem. Suppose I have a receiver listening to both TCP and UDP messages. It will receive several strings, append them to a deque and after receiving "finish"
message, it will start processing the deque.
If I receive a UDP message, I need to stop the processing, remove the last item of deque and then continue the processing.
from collections import deque
host = commands.getoutput("hostname -I")
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print "received data:", data
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
inputready,outputready,exceptready = select(input,[],[])
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()
I have problem in pausing the program upon receiving a UDP message and then returning to the processing phase. Right now, if a UDP message is sent to my program while processing, it won't receive the message until the end of processing (and then the deque is empty). I thought maybe threading or multiprocessing may help, but I can't figure out how to apply them to the code.
Nobody forces you to empty out the dequeue. You can check if an UDP message has arrived before dequeueing the next workload. And that is as far as you can get with threads, as they do not allow you to interrupt arbitrary code. They can always only be terminated cooperatively.
If your single item processing takes too long, then multiprocessing of work-items is an option, as you can kill an external process.
Use select.select
to check for incoming data on your sockets with a short timeout, before continuing to process the next workload. Alternatively you could use a thread waiting on input on the thread and manipulate the dequeue.
EDIT This is your code made to work with python3, select.select and a timeout. Triggering read_udp works with netcat with echo foo | nc -4 -u localhost 5005
but then triggers an exception because you assume the existence of elements in the dequeue - which is an application logic problem that is independent of the question how to interleave listening and working.
import socket
import select
from collections import deque
host = "localhost"
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print("received data:", data)
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
def processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
print("select.select")
inputready,outputready,exceptready = select.select(input,[],[], 0.1)
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()