Search code examples
pythonpython-multiprocessing

Python Queue inheritance issues with multiple processes


from multiprocessing import Process, Manager, Queue
import schedule
import time

def checkBirthdays(accountQueue):
    print('[CheckBirthdays] Initated')
    acc = {
        'email': 'demo@test.com'
    }
    accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
    accountProcessing.append(acc)
    accountQueue.put(accountProcessing) # ****

def createSchedule(accountQueue):
    # similar to cron, executes at a certain time
    schedule.every().day.at("23:51").do(checkBirthdays, accountQueue)
    while True:
        schedule.run_pending()
        # check every 60 seconds
        time.sleep(60)

def main():
    # FileNotFoundError: [Errno 2] No such file or directory
    manager = Manager()
    accountQueue = manager.Queue()

    # RuntimeError: Queue objects should only be shared between processes through inheritance
    # accountQueue = Queue()
    schedule = Process(target=createSchedule, args=(accountQueue,)).start()

if __name__ == '__main__':
    main()

Manager().Queue() gives this error FileNotFoundError: [Errno 2] No such file or directory. I'm not certain while I get a FileNotFound error, it loads the pickle file fine with Queue()

Queue() gives this error RuntimeError: Queue objects should only be shared between processes through inheritance at **** marks

[CheckBirthdays] Initated
Process Process-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/tehpirate/Documents/Kelloggs/test.py", line 18, in createSchedule
    schedule.run_pending()
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 780, in run_pending
    default_scheduler.run_pending()
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 100, in run_pending
    self._run_job(job)
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 172, in _run_job
    ret = job.run()
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 661, in run
    ret = self.job_func()
  File "/home/tehpirate/Documents/Kelloggs/test.py", line 10, in checkBirthdays
    accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
  File "<string>", line 2, in empty
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
    self._connect()
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 502, in Client
    c = SocketClient(address)
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

Solution

  • I've only added Manager.join() (seconds adjusted, obviously) as shown in the documentation to keep the master running and the slaves active. If I remove that line manager.join(), it'll immediately drop to the error message you have.

    from multiprocessing import Process, Manager, Queue
    import schedule
    import time
    
    def checkBirthdays(accountQueue):
        print('[CheckBirthdays] Initated')
        acc = {
            'email': 'demo@test.com'
        }
        accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
        accountProcessing.append(acc)
        accountQueue.put(accountProcessing) # ****
    
    def createSchedule(accountQueue):
        # similar to cron, executes at a certain time
        schedule.every(1).seconds.do(checkBirthdays, accountQueue)
        while True:
            schedule.run_pending()
            # check every 60 seconds
            time.sleep(1)
    
    def main():
        # FileNotFoundError: [Errno 2] No such file or directory
        manager = Manager()
        accountQueue = manager.Queue()
    
        # RuntimeError: Queue objects should only be shared between processes through inheritance
        # accountQueue = Queue()
        schedule = Process(target=createSchedule, args=(accountQueue,)).start()
        manager.join()  # <--------------------------------
    
    if __name__ == '__main__':
        main()
    

    It's happening because there's an internal machinery that handles the inter-process communication via a Socket in this case. If a master or one of its slaves is killed, the communication tunnel shuts as well, thus it'll say there's no such "file" as in the end, socket is still a (kind of) file on Unix-based systems.

    What manager.join() really does is that it checks if the master (your manager) can discard the slave processes because there's no use for them anymore (last program line executed, crash, etc) or whether to "reschedule" this action to the future and keep running.

    Something like this, in a high-level code sketch:

    import time
    
    while True:  # executed in the master process
        for slave in get_all_slaves():
            state = discard_if_posible(slave)
            # do stuff with the final state from the slave
        time.sleep(0.5)
    

    To go more in-depth, Manager._process is nothing more than a Process instance and here are the join()s:

    Finally, consider using context managers for the classes to prevent shutdown/cleanup issues to happen as well. There's a reason why there are with, __enter__() and __exit__(), especially in the multiprocessing codebase.


    Edit: The code works just fine. Either you haven't provided a minimal working example reproducing your problem or you haven't fixed it by the proposed answer.

    /tmp$ mkdir mpanswer && cd mpanswer
    /tmp/mpanswer$ cat << EOF > main.py 
    > from multiprocessing import Process, Manager, Queue
    > import schedule
    > import time
    > 
    > def checkBirthdays(accountQueue):
    >     print('[CheckBirthdays] Initated')
    >     acc = {
    >         'email': 'demo@test.com'
    >     }
    >     accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
    >     accountProcessing.append(acc)
    >     accountQueue.put(accountProcessing) # ****
    > 
    > def createSchedule(accountQueue):
    >     # similar to cron, executes at a certain time
    >     schedule.every(1).seconds.do(checkBirthdays, accountQueue)
    >     while True:
    >         schedule.run_pending()
    >         # check every 60 seconds
    >         time.sleep(1)
    > 
    > def main():
    >     # FileNotFoundError: [Errno 2] No such file or directory
    >     manager = Manager()
    >     accountQueue = manager.Queue()
    > 
    >     # RuntimeError: Queue objects should only be shared between processes through inheritance
    >     # accountQueue = Queue()
    >     schedule = Process(target=createSchedule, args=(accountQueue,)).start()
    >     manager.join()  # <--------------------------------
    > 
    > if __name__ == '__main__':
    >     main()
    > EOF
    /tmp/mpanswer$ cat << EOF > Dockerfile 
    > FROM python:alpine
    > COPY main.py main.py
    > RUN pip install schedule
    > CMD python main.py
    > EOF
    /tmp/mpanswer$ docker build --no-cache --tag mpanswer . && docker run --rm -it mpanswer
    Sending build context to Docker daemon  4.096kB
    Step 1/4 : FROM python:alpine
     ---> 03c59395ddea
    Step 2/4 : COPY main.py main.py
     ---> 4ebcad402bf1
    Step 3/4 : RUN pip install schedule
     ---> Running in 3b4dcc189d48
    Collecting schedule
      Downloading schedule-1.1.0-py2.py3-none-any.whl (10 kB)
    Installing collected packages: schedule
    Successfully installed schedule-1.1.0
    WARNING: You are using pip version 21.0; however, version 21.2.4 is available.
    You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
    Removing intermediate container 3b4dcc189d48
     ---> 039c1a333a5d
    Step 4/4 : CMD python main.py
     ---> Running in 9dacf9084f28
    Removing intermediate container 9dacf9084f28
     ---> de1391085794
    Successfully built de1391085794
    Successfully tagged mpanswer:latest
    [CheckBirthdays] Initated
    [CheckBirthdays] Initated
    [CheckBirthdays] Initated
    [CheckBirthdays] Initated
    ^C
    /tmp/mpanswer$