Search code examples
pythonmultithreadingsocketsunix

How can I wait for a threading event and socket pollin at the same time?


I have a select.poll() object for incoming messages of a socket, and a queue.Queue() object that contains potential outgoing messages.

While both objects separately support waiting for a timeout without spending CPU cycles, I need to wait on both at the same time and resume the thread once either of them triggers. Is there a mechanism for that?

I have tried waiting on both after another in a loop with very short timeout, but this is basically busy-waiting and ended up costing too much CPU. I have also tried increasing the timeout, which frees up CPU as expected but increases latency too much.

It looks like select.poll() can wait on any UNIX file descriptor, so perhaps another way of asking my question is whether there is a queue.Queue() or threading.Event() alternative that exposes a file descriptor that can be polled?

Current

import socket
import select
import queue
import threading

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('localhost', 4200))
incoming = select.poll()
incoming.register(socket, select.POLLIN)

outgoing = queue.Queue()

while True:
  if incoming.poll(timeout=0.001):
    read(socket)
  try:
    msg = outgoing.get(block=False, timeout=0.001)
    send(socket, msg)
 except queue.Empty:
  pass

Desired

import socket
import queue
import threading

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('localhost', 4200))
outgoing = queue.Queue()

poller = HYBRID_POLLER()  # TODO
poller.register(socket)
poller.register(outgoing)

while True:
  available = poller.poll(timeout=1)
  if socket in available:
    read(socket)
  if outgoing in available:
    msg = outgoing.get(block=False, timeout=0.001)
    send(socket, msg)

Solution

  • See chapter 12.13 in the Python Cookbook, 3rd Edition, which describes how to make a queue.Queue instance "pollable" (a socket.socket instance is already pollable):

    import queue
    import socket
    import os
    
    class PollableQueue(queue.Queue):
        def __init__(self):
            super().__init__()
            # Create a pair of connected sockets
            if os.name == 'posix':
                self._putsocket, self._getsocket = socket.socketpair()
            else:
                # Compatibility on non-POSIX systems
                server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                server.bind(('127.0.0.1', 0))
                server.listen(1)
                self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self._putsocket.connect(server.getsockname())
                self._getsocket, _ = server.accept()
                server.close()
    
        def fileno(self):
            return self._getsocket.fileno()
    
        def put(self, item):
            super().put(item)
            self._putsocket.send(b'x')
    
        def get(self):
            self._getsocket.recv(1)
            return super().get()
    

    Then you can pass to select.select a list containing your queue and socket instances:

    import select
    
    socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket.connect(('localhost', 4200))
    outgoing = PollableQueue()
    ...
    
    while True:
        available, _, _ = select.select([outgoing, socket], [], [], timeout=1.0)
        if socket in available:
            read(socket)
        if outgoing in available:
            msg = outgoing.get()
            send(socket, msg)
    

    Is socket the best choice to name your socket instance since it is a possible conflict with the module named socket?