Search code examples
pythonmultithreadingwebsocketbinance

Trigger function every new minute w/ websocket data


Using the Binance futures aggregate trades stream (websocket), I am trying to calculate the total value of all trades per side (market maker/taker), per minute. The part I'm struggling with is trying to find an effective way of identifying when a minute ends and a new one starts. My solution thus far has been to convert the unix value to a datetime value, keep only the 'minute' part of the datetime value, store it as a variable (named 'minute'), and check for each new message how the latest 'minute' value compares to the previous 'minute' value.

When I run the script the 'minute' variable is updated whenever a new minute starts, but none of the previous steps under the final elif statement seem to work.

elif unixmin != minute:
    sumprodmkr = sum(x*y for x, y in list(zip(qtymkr, pricemkr)))
    sumprodtkr = sum(x*y for x, y in list(zip(qtytkr, pricetkr)))
    print (sumprodmkr)
    print (sumprodtkr)
    qtymkr.clear()
    pricemkr.clear()
    qtytkr.clear()
    pricetkr.clear()
    minute = unixmin

I'm sure there is a more efficient way of doing this, I would just appreciate if someone could point me in the right direction.

Full script below:

import websocket
import json
from datetime import datetime

socket = 'wss://stream.binance.com:9443/ws/btcusdt@trade'

ws = websocket.WebSocketApp(socket, on_message=on_message, on_error=on_error, on_close=on_close)

qtymkr = []
pricemkr = []
qtytkr = []
pricetkr = []

def on_message(ws, message):
    content = json.loads(message)
    ismaker = content['m']
    price = content['p']
    qty = content['q']
    unix = content['T']
    unix2 = int(content['T'])/1000
    unixmin = datetime.utcfromtimestamp(unix2).strftime('%M')
    
    if ismaker == 'True':
        qtymkr.append(float(qty))
        pricemkr.append(float(price))
    else:
        qtytkr.append(float(qty))
        pricetkr.append(float(price))
    
    global minute
    minute = 0
    
    if minute == 0:
        minute = unixmin
    elif unixmin == minute:
        pass
    elif unixmin != minute:
        sumprodmkr = sum(x*y for x, y in list(zip(qtymkr, pricemkr)))
        sumprodtkr = sum(x*y for x, y in list(zip(qtytkr, pricetkr)))
        print (sumprodmkr)
        print (sumprodtkr)
        qtymkr.clear()
        pricemkr.clear()
        qtytkr.clear()
        pricetkr.clear()
        minute = unixmin
    
def on_error(ws, error):
    print(error)
    
def on_close(ws, close_status_code, close_msg):
    print('Socket closed')

ws.run_forever()

Solution

  • The code definitely needs some work but it's functional and achieves what I was looking for.

    import websocket
    import json
    from datetime import datetime
    
    socket = 'wss://stream.binance.com:9443/ws/btcusdt@trade'
    
    qtymkr = []
    pricemkr = []
    qtytkr = []
    pricetkr = []
    
    minute = 0
    unixmin = 0
    sumprodmkr = 0 
    sumprodtkr = 0
    
    def minflag():
        global minute
        global unixmin
        global sumprodmkr
        global sumprodtkr
        if minute == 0:
            minute = unixmin
        elif unixmin == minute:
            break
        elif unixmin != minute:
            sumprodmkr = sum(x*y for x, y in list(zip(qtymkr, pricemkr)))
            sumprodtkr = sum(x*y for x, y in list(zip(qtytkr, pricetkr)))
            print (sumprodmkr)
            print (sumprodtkr)
            qtymkr.clear()
            pricemkr.clear()
            qtytkr.clear()
            pricetkr.clear()
            minute = unixmin
        
    
    def on_message(ws, message):
        global minute
        global unixmin
        global sumprodmkr
        global sumprodtkr
        content = json.loads(message)
        ismaker = content['m']
        price = content['p']
        qty = content['q']
        unix = content['T']
        unix2 = int(content['T'])/1000
        unixmin = datetime.utcfromtimestamp(unix2).strftime('%M')
        if ismaker == True:
            qtymkr.append(float(qty))
            pricemkr.append(float(price))
        else:
            qtytkr.append(float(qty))
            pricetkr.append(float(price))
        minflag()
    
    def on_error(ws, error):
        print(error)
        
    def on_close(ws, close_status_code, close_msg):
        print('Socket closed')
            
    ws = websocket.WebSocketApp(socket, on_message=on_message, on_error=on_error, on_close=on_close)
    
    ws.run_forever()