from multiprocessing import Process, Manager, Queue
import schedule
import time
def checkBirthdays(accountQueue):
print('[CheckBirthdays] Initated')
acc = {
'email': 'demo@test.com'
}
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
accountProcessing.append(acc)
accountQueue.put(accountProcessing) # ****
def createSchedule(accountQueue):
# similar to cron, executes at a certain time
schedule.every().day.at("23:51").do(checkBirthdays, accountQueue)
while True:
schedule.run_pending()
# check every 60 seconds
time.sleep(60)
def main():
# FileNotFoundError: [Errno 2] No such file or directory
manager = Manager()
accountQueue = manager.Queue()
# RuntimeError: Queue objects should only be shared between processes through inheritance
# accountQueue = Queue()
schedule = Process(target=createSchedule, args=(accountQueue,)).start()
if __name__ == '__main__':
main()
Manager().Queue() gives this error FileNotFoundError: [Errno 2] No such file or directory
. I'm not certain while I get a FileNotFound error, it loads the pickle file fine with Queue()
Queue() gives this error RuntimeError: Queue objects should only be shared between processes through inheritance
at ****
marks
[CheckBirthdays] Initated
Process Process-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/tehpirate/Documents/Kelloggs/test.py", line 18, in createSchedule
schedule.run_pending()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 780, in run_pending
default_scheduler.run_pending()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 100, in run_pending
self._run_job(job)
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 172, in _run_job
ret = job.run()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 661, in run
ret = self.job_func()
File "/home/tehpirate/Documents/Kelloggs/test.py", line 10, in checkBirthdays
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
File "<string>", line 2, in empty
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
self._connect()
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
I've only added Manager.join()
(seconds adjusted, obviously) as shown in the documentation to keep the master running and the slaves active. If I remove that line manager.join()
, it'll immediately drop to the error message you have.
from multiprocessing import Process, Manager, Queue
import schedule
import time
def checkBirthdays(accountQueue):
print('[CheckBirthdays] Initated')
acc = {
'email': 'demo@test.com'
}
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
accountProcessing.append(acc)
accountQueue.put(accountProcessing) # ****
def createSchedule(accountQueue):
# similar to cron, executes at a certain time
schedule.every(1).seconds.do(checkBirthdays, accountQueue)
while True:
schedule.run_pending()
# check every 60 seconds
time.sleep(1)
def main():
# FileNotFoundError: [Errno 2] No such file or directory
manager = Manager()
accountQueue = manager.Queue()
# RuntimeError: Queue objects should only be shared between processes through inheritance
# accountQueue = Queue()
schedule = Process(target=createSchedule, args=(accountQueue,)).start()
manager.join() # <--------------------------------
if __name__ == '__main__':
main()
It's happening because there's an internal machinery that handles the inter-process communication via a Socket in this case. If a master or one of its slaves is killed, the communication tunnel shuts as well, thus it'll say there's no such "file" as in the end, socket is still a (kind of) file on Unix-based systems.
What manager.join()
really does is that it checks if the master (your manager
) can discard the slave processes because there's no use for them anymore (last program line executed, crash, etc) or whether to "reschedule" this action to the future and keep running.
Something like this, in a high-level code sketch:
import time
while True: # executed in the master process
for slave in get_all_slaves():
state = discard_if_posible(slave)
# do stuff with the final state from the slave
time.sleep(0.5)
To go more in-depth, Manager._process
is nothing more than a Process
instance and here are the join()
s:
Finally, consider using context managers for the classes to prevent shutdown/cleanup issues to happen as well. There's a reason why there are with
, __enter__()
and __exit__()
, especially in the multiprocessing codebase.
Edit: The code works just fine. Either you haven't provided a minimal working example reproducing your problem or you haven't fixed it by the proposed answer.
/tmp$ mkdir mpanswer && cd mpanswer
/tmp/mpanswer$ cat << EOF > main.py
> from multiprocessing import Process, Manager, Queue
> import schedule
> import time
>
> def checkBirthdays(accountQueue):
> print('[CheckBirthdays] Initated')
> acc = {
> 'email': 'demo@test.com'
> }
> accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
> accountProcessing.append(acc)
> accountQueue.put(accountProcessing) # ****
>
> def createSchedule(accountQueue):
> # similar to cron, executes at a certain time
> schedule.every(1).seconds.do(checkBirthdays, accountQueue)
> while True:
> schedule.run_pending()
> # check every 60 seconds
> time.sleep(1)
>
> def main():
> # FileNotFoundError: [Errno 2] No such file or directory
> manager = Manager()
> accountQueue = manager.Queue()
>
> # RuntimeError: Queue objects should only be shared between processes through inheritance
> # accountQueue = Queue()
> schedule = Process(target=createSchedule, args=(accountQueue,)).start()
> manager.join() # <--------------------------------
>
> if __name__ == '__main__':
> main()
> EOF
/tmp/mpanswer$ cat << EOF > Dockerfile
> FROM python:alpine
> COPY main.py main.py
> RUN pip install schedule
> CMD python main.py
> EOF
/tmp/mpanswer$ docker build --no-cache --tag mpanswer . && docker run --rm -it mpanswer
Sending build context to Docker daemon 4.096kB
Step 1/4 : FROM python:alpine
---> 03c59395ddea
Step 2/4 : COPY main.py main.py
---> 4ebcad402bf1
Step 3/4 : RUN pip install schedule
---> Running in 3b4dcc189d48
Collecting schedule
Downloading schedule-1.1.0-py2.py3-none-any.whl (10 kB)
Installing collected packages: schedule
Successfully installed schedule-1.1.0
WARNING: You are using pip version 21.0; however, version 21.2.4 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
Removing intermediate container 3b4dcc189d48
---> 039c1a333a5d
Step 4/4 : CMD python main.py
---> Running in 9dacf9084f28
Removing intermediate container 9dacf9084f28
---> de1391085794
Successfully built de1391085794
Successfully tagged mpanswer:latest
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
^C
/tmp/mpanswer$