Search code examples
pythonmultithreadingpython-multithreading

Lossing reference to thread in subprocess


I am building an application that handles different child processes and threads using the multiprocessing and multithreading modules. I am obtaining some unexpected results from launching threads within a subprocess.

I have a subprocess server that reads from a socket and calls an API.

ServerSubprocess:

class ServerSubprocess(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        ...
        self._cnnr = ApiConnector()
        self._cnnr.connect()


    def run(self) -> None:
        while True:
            ...
            req_params = socket.recv()
            self.request_data(req_params)

    def request_data(self, req_params, max_retries=5):
        retries = 0

        while retries < max_retries:
            try:
                resp = self._cnnr.request_api(**req_params)
            except (ConnectionError, TimeoutError) as e:
                pass

            reties += 1
        return resp

The connector class serves as an interface betwen the API and the server:

Connector class

class Connector(ConnectorInterface):

def __init__(self):
    self._api = Api(name, host, port)

def connect(self,, n_attempts=10):
    n = 0

    while n < n_attempts:
        try:
            self._api.connect()
        except (asyncio.TimeoutError, ConnectionError):
            n += 1
            self._logger.info(f"Couldn't connect. Attempting again... attempt {n + 1}")


def request_api(self, sec, time_params, side):
    bars = self._api.request_data(req_params)

The API consists on a client (main_thread) that connects another socket to the data source and a read_thread (reads for responses to the api requests):

API client:

class Api:

    def __init__(self, name: str, host: str, port: int):
        self._host = host
        self._port = port
        self._name = name

        
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._read_thread = threading.Thread(group=None, target=self,
                                             name="%s-reader" % self._name,
                                             args=(), kwargs={}, daemon=None)

    def connect(self) -> None:
        self._sock.connect((self._host, self._port))
        self._send_connect_message()
        self._read_thread.start()
     

    def __call__(self)
        while not self._stop.is_set():
            if self._read_messages():
                self._process_messages()

    def _process_messages():
    """Retrieves req_id from message and process raw message"""
       ....
       req_id, data = self._read_from_socket()
       self._data_buff[req_id] = data
       self_notify_response(req_id)

    
    def _read_messages(self) -> bool:
    """Checks if socket has data"""
       ....

    def _notify_response(self, req_id) -> bool:
    """Sets the event for a req_id"""
       self.__req_event[req_id].set()


    def request_data(self, data) -> np.array:
        req_id = self._get_next_req_id()
        self._req_event[req_id] = threading.Event()
        self._send_cmd(data)
        self._req_event[req_id].wait(timeout=timeout)
        data = self._data_buff[req_id]
        return data

The issue here is not in the implementation of the Api or Connector. The code that I have included is a reduced version to show the main flow of the components. The problem lies on using a subprocess to wrap the Server. Even though the read_thread on the Api runs with no problem once the connector executes its connect method, when request_data() method creates a new request_id in the _req_event dict and then the read_thread checks for incoming responses for this req_id, no update is acknowledged on the read_thread. As a consequence, an KeyError excep is raised when the read_thread tries to retrieve the req_id for the non-existing key. Furthermore, after debugging the Api on sending a new request, the read_thread appears to be stoppped, even though it was launched on Api init. This explains why the read_thread did not acknowledged the new req_id for the new api request.

However, the interesting part is that when the server is launched as a thread rather than as a process, the whole flow works fine. This does not make sense to me since both threads live under the same pid. It seems like, once the read_thread is launched on Api connect(), the reference to the read_thread is lost. Obviously I am missing something about launching threads on a child process.

Any help is welcomed.


Solution

  • The problem was on how I was connecting the connector in the new process. The connection was created outside the context of the child process. The threads read_thread was launched in the parent process not in the child. That's why I was loosing the reference, I mean I wasn't really because the thread was never lauched in the child! To solve the issue the self._cnnr.connect() should be inside run().

    This was a problem of not understanding the multiprocessing.Process object correctly. The runtime is defined in the run() method, not in init. This means that threads should be started in the run method, not in the init.