Search code examples
pythonmultithreadingsocketsnetwork-programmingpython-multithreading

In ThreadPoolExecutor, socket.bind() is blocking without "Address already in use"


I'm writing a Python program using socket and tested listening the same port twice, using ThreadPoolExecutor. This is a minimal program that reproduces my problem:

import socket
import time
import concurrent.futures

def listen6666():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    print("set socket")
    server_socket.bind(("localhost", 6666))
    server_socket.listen()
    print("done")
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", 6666))
server_socket.listen()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    fn = executor.submit(listen6666)
    fn2 = executor.submit(listen6666)
    # listen6666() - "Address already in use"
#fn.result() - "Address already in use"
#fn2.result() - "Address already in use"
time.sleep(10)

I expect "Address already in use" occurs in server_socket.bind in listen6666, but it just don't respond. So the complete output is:

$ python test.py 
set socket
set socket
$

If I call listen6666() without executor or call fn.result() before sleeping, "Address already in use" does occur. I want "Address already in use" also happen in listen6666().

How can I solve this?


Solution

  • You need to manage exceptions in the thread.

    Here's a revised version of your code that shows that an "Address already in use" exception is raised.

    import socket
    import concurrent.futures
    
    ADDRESS = "localhost", 6666
    
    def listen6666(keep=False):
        server_socket = None
        try:
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            print("bind")
            server_socket.bind(ADDRESS)
            print("listen")
            server_socket.listen()
            print("done")
        except Exception as e:
            print(e)
        finally:
            if keep:
                return server_socket
            if server_socket is not None:
                server_socket.close()
    
    server_socket = None
    try:
        server_socket = listen6666(True)
    
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            executor.submit(listen6666)
            executor.submit(listen6666)
    except Exception as e:
        print(e)
    finally:
        if server_socket is not None:
            server_socket.close()
    

    Output:

    bind
    listen
    done
    bind
    bind
    [Errno 48] Address already in use
    [Errno 48] Address already in use