I am new to handling multiprocessing, multithreading etc.. in python.
I am trying to subscribe to multiple Websocket streams from my crypto exchange (API Docs Here), using multiprocessing
.
However, when I run the code below, I only receive ticker information
, but not order book updates
.
How can I fix the code to get both information?
What is the reason that only one websocket seems to be working when it's run on multiprocessing
?
(When I run the functions ws_orderBookUpdates()
and ws_tickerInfo()
separately, without using multiprocessing
, it works fine individually so it is not the exchange's problem.)
import websocket
import json
import pprint
from datetime import datetime
import time
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Multiprocessing two functions
if __name__ == '__main__':
import multiprocessing as mp
mp.Process(target=ws_tickerInfo(), daemon=True).start()
mp.Process(target=ws_orderBookUpdates(), daemon=True).start()
Update
You have created two daemon processes. They will terminate when all non-daemon processes have terminated, which in this case is the main process, which terminates immediately after creating the daemon processes. You are lucky that even one of the processes has a chance to produce output, but why take chances? Do not use dameon processes. Instead:
if __name__ == '__main__':
import multiprocessing as mp
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion
p2.join() # wait for completion
But the real problem was staring us in the face and we both missed it! You had:
p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)
when it should have been:
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
See the difference? You were actually not passing to Process
the function ws_tickerInfo
but calling ws_tickerInfo
and attempting to pass the return value, which would have been the nonsensical None
if the function ever returned (which it doesn't). So you were never even getting to execute the second process-creation statement.
You could have probably also have used multithreading instead of multiprocessing for this, although the Ctrl-C interrupt handler might not work (see below). There should also be a mechanism to terminate the program. I have added some code to detect Ctrl-C and to terminate when that is entered. Also, you have used self
as a function argument as if the function were actually a class method, which it isn't. This is not good programming style. Here is updated source:
import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
def handle_ctrl_c(signum, stack_frame):
sys.exit(0)
if __name__ == '__main__':
import multiprocessing as mp
signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
print('Enter Ctrl-C to terminate.')
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion (will never happen)
p2.join() # wait for completion (will never happen)
To Use Multithreading
if __name__ == '__main__':
import threading
t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
t1.start()
t2.start()
input('Hit enter to terminate...\n')