I have two processes where on runs the main loop and the other gets the latest prices with websocket api, these two processes can communicate with a pipe object. so my core process will send a request message to the other process and then will receive a message with the latest prices. One other thing is that unless the websocket api process responses, the program will have to stop (better this way) which brings up the following problem. once the program is running for a few hours, it'll hang/freeze after core sends a request and waits for the message to be sent back and it just stays that way. since my code is very long, I created a sample code which you can find below, I also didn't add the websocket code since its not relevant here.
from time import sleep
from multiprocessing import Pipe, Process
class websocket_stuff:
def __init__(self, pipe: Pipe):
self.process_pipe = pipe
# Websocket functions are not added since its not necessary for demonstration
def WS_run_forever(self):
# here i do the WS infinite loop, and as part of the loop i do the following:
while True:
if self.process_pipe.poll() is True:
rec = self.process_pipe.recv()
if (rec is not None) and (isinstance(rec, dict) is True):
if ('request' in rec) and (rec['request'] == 'candle'):
self.process_pipe.send(self.process_pipe, {'message': 'candle', 'ticker': rec['ticker'],
'data': 'OHLC data'})
class core:
def __init__(self):
self.name = 'ticker'
self.WS_pipe, kid = Pipe()
self.WS_obj = websocket_stuff(kid)
self.WS_process = Process(target=self.WS_obj.WS_run_forever)
self.WS_process.start()
def get_data(self):
self.WS_pipe.send({'request': 'candle', 'ticker': self.name})
twe = 0
while True:
# It hangs/freezes here
if self.WS_pipe.poll(2) is True:
rec = self.WS_pipe.recv()
break
else:
twe += 1
if twe >= 3:
self.WS_pipe.send({'request': 'candle', 'ticker': self.name})
sleep(0.0001)
if ('message' in rec) and (rec['message'] == 'candle') and (rec['ticker'] == self.name):
return rec['data']
return None
if __name__ == '__main__':
c = core()
while True:
data = c.get_data()
print(data)
sleep(1)
ive checked the processes and it seems like theyre still running, im also not getting any exceptions on the processes so it can't be an error. while tracing the issue, I saw that after the problematic pipe send from the main process, the other process doesn't receive the data that was requested from it, therefore the issue! the only thing I can think of is a problem with the pipe. can anyone help me find out what's going on?
It looks like the "not needed websockets stuff" is where your processing halts, and would be the ideal point to help you there - no need for them to be complete - but the fixed values in the .send line do not help: you could write a winded down method where you retrieve your values with the methods for I/O your are using.
Other than that, it is possible to add some control in the parent process, and, given a timeout - say 20 seconds-1 min, simply kill the networking process and spawn a new one. This may actually be simplet than trying to re-try the websocket method in the child process - just add some time accounting variable in the parent process:
from time import sleep
from multiprocessing import Pipe, Process , terminate
(...)
import time
timeout = 20
class core:
def __init__(self):
self.name = 'ticker'
self.WS_process = None
self.reset_child()
def reset_child(self):
if self.WS_process:
terminate(self.WS_process)
self.WS_pipe, kid = Pipe()
self.WS_obj = websocket_stuff(kid)
self.WS_process = Process(target=self.WS_obj.WS_run_forever)
self.WS_process.start()
def get_data(self):
self.WS_pipe.send({'request': 'candle', 'ticker': self.name})
twe = 0
last_rec = time.time()
last_sent_data = None
while True:
# It hangs/freezes here
if self.WS_pipe.poll(2):
rec = self.WS_pipe.recv()
break
else:
twe += 1
if twe >= 3:
self.WS_pipe.send(last_sent_data:=({'request': 'candle', 'ticker': self.name}))
sleep(0.0001)
if time.time() - last_rec > timeout:
self.reset_child()
self.WS_pipe.send(last_sent_data)
last_rec = time.time()
if ('message' in rec) and (rec['message'] == 'candle') and (rec['ticker'] == self.name):
return rec['data']
return None
if __name__ == '__main__':
c = core()
while True:
data = c.get_data()
print(data)
sleep(1)
(Note that when checking for conditions there is no need to use is True
: the if
statement will execute if the expression is in anyway truthy, and "is True" is not only redundant: it may fail if the expression results in other valid "truthy" values like a non-zero number, or a sequence with contents. )