Search code examples
pythonwebsocketmultiprocessingalgorithmic-tradingcryptoapi

How to Subscribe to multiple Websocket streams using Muiltiprocessing


I am new to handling multiprocessing, multithreading etc.. in python.

I am trying to subscribe to multiple Websocket streams from my crypto exchange (API Docs Here), using multiprocessing. However, when I run the code below, I only receive ticker information, but not order book updates.

How can I fix the code to get both information?
What is the reason that only one websocket seems to be working when it's run on multiprocessing?

(When I run the functions ws_orderBookUpdates() and ws_tickerInfo() separately, without using multiprocessing, it works fine individually so it is not the exchange's problem.)

import websocket
import json
import pprint
from datetime import datetime
import time

# Function to subscribe to ticker information.
def ws_tickerInfo():
    def on_open(self):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_ticker_BTC_JPY"}
        }
        ws.send(json.dumps(subscribe_message))

    def on_message(self, message, prev=None):
        print(f"Ticker Info, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(self):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)

    ws.run_forever()


# Function to subscribe to order book updates.
def ws_orderBookUpdates():
    def on_open(self):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_board_BTC_JPY"}
        }
        ws.send(json.dumps(subscribe_message))

    def on_message(self, message):
        print(f"Order Book, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(self):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)
    ws.run_forever()


# Multiprocessing two functions
if __name__ == '__main__':
    import multiprocessing as mp

    mp.Process(target=ws_tickerInfo(), daemon=True).start()
    mp.Process(target=ws_orderBookUpdates(), daemon=True).start()


Solution

  • Update

    You have created two daemon processes. They will terminate when all non-daemon processes have terminated, which in this case is the main process, which terminates immediately after creating the daemon processes. You are lucky that even one of the processes has a chance to produce output, but why take chances? Do not use dameon processes. Instead:

    if __name__ == '__main__':
        import multiprocessing as mp
    
        p1 = mp.Process(target=ws_tickerInfo)
        p2 = mp.Process(target=ws_orderBookUpdates)
        p1.start()
        p2.start()
        p1.join() # wait for completion
        p2.join() # wait for completion
    

    But the real problem was staring us in the face and we both missed it! You had:

        p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
        p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)
    

    when it should have been:

        p1 = mp.Process(target=ws_tickerInfo)
        p2 = mp.Process(target=ws_orderBookUpdates)
    

    See the difference? You were actually not passing to Process the function ws_tickerInfo but calling ws_tickerInfo and attempting to pass the return value, which would have been the nonsensical None if the function ever returned (which it doesn't). So you were never even getting to execute the second process-creation statement.

    You could have probably also have used multithreading instead of multiprocessing for this, although the Ctrl-C interrupt handler might not work (see below). There should also be a mechanism to terminate the program. I have added some code to detect Ctrl-C and to terminate when that is entered. Also, you have used self as a function argument as if the function were actually a class method, which it isn't. This is not good programming style. Here is updated source:

    import websocket
    import json
    import pprint
    from datetime import datetime
    import time
    import sys
    import signal
    
    # Function to subscribe to ticker information.
    def ws_tickerInfo():
        def on_open(wsapp):
            print("opened")
            subscribe_message = {
                "method": "subscribe",
                "params": {'channel': "lightning_ticker_BTC_JPY"}
            }
            wsapp.send(json.dumps(subscribe_message))
    
        def on_message(wsapp, message, prev=None):
            print(f"Ticker Info, Received : {datetime.now()}")
    
            ###### full json payloads ######
            # pprint.pprint(json.loads(message))
    
        def on_close(wsapp):
            print("closed connection")
    
        endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        ws = websocket.WebSocketApp(endpoint,
                                    on_open=on_open,
                                    on_message=on_message,
                                    on_close=on_close)
    
        ws.run_forever()
    
    
    # Function to subscribe to order book updates.
    def ws_orderBookUpdates():
        def on_open(wsapp):
            print("opened")
            subscribe_message = {
                "method": "subscribe",
                "params": {'channel': "lightning_board_BTC_JPY"}
            }
            wsapp.send(json.dumps(subscribe_message))
    
        def on_message(wsapp, message):
            print(f"Order Book, Received : {datetime.now()}")
    
            ###### full json payloads ######
            # pprint.pprint(json.loads(message))
    
        def on_close(wsapp):
            print("closed connection")
    
        endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        ws = websocket.WebSocketApp(endpoint,
                                    on_open=on_open,
                                    on_message=on_message,
                                    on_close=on_close)
        ws.run_forever()
    
    def handle_ctrl_c(signum, stack_frame):
        sys.exit(0)
    
    if __name__ == '__main__':
        import multiprocessing as mp
    
        signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
        print('Enter Ctrl-C to terminate.')
        p1 = mp.Process(target=ws_tickerInfo)
        p2 = mp.Process(target=ws_orderBookUpdates)
        p1.start()
        p2.start()
        p1.join() # wait for completion (will never happen)
        p2.join() # wait for completion (will never happen)
    

    To Use Multithreading

    if __name__ == '__main__':
        import threading
    
        t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
        t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
        t1.start()
        t2.start()
        input('Hit enter to terminate...\n')