Search code examples
pythonasynchronouswebsocketpython-asynciobinance

How to run websockets independently


I try to start Binance websocket to collect candles data. It works well if there is no delay in the data processing function. But when some pauses occurs in the function processing one ticker data, it also delays the response for other ticker. Do anybody know how to run them independantly?

from binance.client import Client
from binance.websockets import BinanceSocketManager

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)

def process(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)

def socket_1():
     conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')

def socket_2():
     conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')

socket_1()
socket_2()

bm.start()

I tried to make the socket run two separate tasks with asyncio as @Mike Malyi suggested, but it did not eliminate the delay:

import asyncio

def process(msg):
    asyncio.run(main(msg))

async def main(msg):
    if msg['s'] == 'ETHUSDT':
        task1 = asyncio.create_task(process_message(msg))
        await task1
    else:
        task2 = asyncio.create_task(process_message(msg))
        await task2

async def process_message(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        await asyncio.sleep(5)

eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')

bm.start()

I also tried to make the function run independanly using Queue in threads, but it did not help, one function still delays the other:

from queue import Queue

def consumer(in_q):
    while True:
        msg = in_q.get()
        process_message(msg)
    
def producer(out_q):
    eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
    bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)
        print(f"{msg['s']} with delay, {time.strftime('%X')}")
    else:
        print(f"{msg['s']} {time.strftime('%X')}")


q = Queue()
t1 = Thread(target = consumer, args =(q, )) 
t2 = Thread(target = producer, args =(q, )) 
t1.start() 
t2.start() 

bm.start() 

Solution

  • from binance.client import Client
    from binance.websockets import BinanceSocketManager
    import _thread as thread
    import time
    import queue
    
    api_key = ''
    api_secret = ''
    client = Client(api_key, api_secret)
    
    def process_message(msg):
        if msg['s'] == 'ETHUSDT':
          print(f"{msg['s']} with delay, {time.strftime('%X')}")
          time.sleep(5)
          print('delay end')  
        else:
            print(f"{msg['s']} {time.strftime('%X')}")
      
    
    def build_thread (symbol):
      print('start thread', symbol)
      q = queue.Queue()
      bm = BinanceSocketManager(client, user_timeout=60)
      conn_key = bm.start_kline_socket(symbol, q.put, '1h')
      bm.start()
      while(True):
        msg = q.get()
        process_message(msg)
    
    thread.start_new_thread(build_thread, ('ETHUSDT', ))  
    thread.start_new_thread(build_thread, ('BNBUSDT', ))