Search code examples
websocketbinanceorderbook

Websocket data processing speed (binance api, order book)


I have been working on an websocket connection for some time and encountered large latency and overflow problems with my processing.

My current architecture is 2 processes:

  • websocket connection and pushing to database (WebsocketProcessor)
  • data fetching from db and processing (DataProcessor)

Main problem is that I still dont fulfil all the given rules for updating an local order book:

How to manage a local order book correctly

  1. Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth.
  2. Buffer the events you receive from the stream.
  3. Get a depth snapshot from https://api.binance.com/api/v3/depth?symbol=BNBBTC&limit=1000 .
  4. Drop any event where u is <= lastUpdateId in the snapshot.
  5. The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1.
  6. While listening to the stream, each new event's U should be equal to the previous event's u+1. ...

Condition number 5 is never met. We delete unnecessary updates but the condition is never met and also it seems that we create "gaps" in the update ID chains to when entering the orderBookUpdate loop we get some warnings that update entries are missing. For about 10 entries (the ones pushed back after initial deletions).

INFO:root:Websocket connection established
INFO:root:Initiated trades: {'_id': 1, 'trades': []}
INFO:root:Deleted 6 updates from depth bufferINFO:root:Order book initialization updateId not stable
INFO:root:Initiated orderbook: ...
INFO:root:Missing depth update id. Current id: 47878509997, next id: 47878507790
INFO:root:Missing depth update id. Current id: 47878507790, next id: 47878507809
INFO:root:Missing depth update id. Current id: 47878507809, next id: 47878507822
INFO:root:Missing depth update id. Current id: 47878507822, next id: 47878507832
INFO:root:Missing depth update id. Current id: 47878507832, next id: 47878507839
INFO:root:Missing depth update id. Current id: 47878507839, next id: 47878507847

And here the setup and processing of order book and updates in process 2. DataProcessor:

    def initiateOrderBook(self):
        # check if we already have updates in database
        while True:
            # delete old updates
            updates = self.collection.find_one_and_update(
                filter={"_id": 4},
                update={
                    "$set": {"depthUpdates": []}  # Reset depthUpdates to empty dict
                },
                projection={"depthUpdates": 1},  # Only return the pulled updates
                return_document=pymongo.ReturnDocument.BEFORE
            )

            if updates is None:
                logging.info('Updates not ready in database trying again...')
                time.sleep(2)
            else:
                break

        # order updates
        updates = SortedDict(list((x['u'], x) for x in updates['depthUpdates']))  # order updates

        # get orderbook snapshot
        orderBook = self.getOrderbook()

        # logic filter not needed updates
        c = 0
        for last_update_id, data in updates.items():
            if last_update_id <= orderBook['lastUpdateId']:
                updates.pop(last_update_id)
                c += 1
        logging.info(f'Deleted {c} updates from depth buffer')

        # check if orderbook and updates are in order
        first_item = updates.peekitem()[1]
        if first_item['U'] <= orderBook['lastUpdateId'] + 1 <= first_item['u']:
            logging.info(f"Order book is ready to update")
        else:
            logging.info(f"Order book initialization updateId not stable")

        # push rest of updates back into db
        filter_criteria = {"_id": 4}
        update = {
            "$push": {"depthUpdates": {"$each": list(updates.values())}}
        }

        self.collection.update_one(filter_criteria, update)

        # set order book in database
        document = self.collection.find_one_and_update(
            filter={"_id": 2},
            update={
                "$set": {"orderBook": orderBook}
            },
            upsert=True,
            return_document=pymongo.ReturnDocument.AFTER
        )
        logging.info(f'Initiated orderbook: {document}')

    def orderBookUpdate(self, updatingSpeed):
        while True:
            # fetch
            document = self.collection.find_one(filter={"_id": 2})

            updates = self.collection.find_one_and_update(
                filter={"_id": 4},
                update={
                    "$set": {"depthUpdates": []}  # Reset depthUpdates to empty dict
                },
                projection={"depthUpdates": 1},  # Only return the pulled updates
                return_document=pymongo.ReturnDocument.BEFORE
            )

            # processing update
            orderBook = document['orderBook']
            updates = SortedDict(list((x['u'], x) for x in updates['depthUpdates']))  # order updates

            # order book update logic
            for entry in updates.values():
                # check update chain integrety
                if orderBook['lastUpdateId'] + 1 != entry['u']:
                    logging.info(f'Missing depth update id. Current id: {orderBook["lastUpdateId"]}, next id: {entry["u"]}')

                for side in ['b', 'a']:
                    for price_level in entry[side]:
                        quantity = float(price_level[1])
                        if quantity == 0:
                            orderBook[side].pop(price_level[0], None)
                        else:
                            orderBook[side][price_level[0]] = quantity

                orderBook['lastUpdateId'] = entry['u']

            # Limit to the closest 1000 entries
            orderBook['b'] = SortedDict(list(orderBook['b'].items())[-500:])
            orderBook['a'] = SortedDict(list(orderBook['a'].items())[:500])

            # Push the updated order book back to MongoDB
            self.collection.update_one(filter={"_id": 2}, update={"$set": {"orderBook": orderBook}})

            # updating speed
            time.sleep(updatingSpeed)

Solution

  • Luckely nobody answered because the problems where just bad programming.

    As to be expected editing an dict from inside an iteration of itself leads to unexpected behavior like wrong deletions in the updates. New approach:

    # logic filter not needed updates
            c = 0
            toRemove = []
            for last_update_id, data in updates.items():
                if last_update_id <= orderBook['lastUpdateId']:
                    toRemove.append(last_update_id)
                    c += 1
            # Remove the collected keys from the updates
            for key in toRemove:
                updates.pop(key)
            logging.info(f'Deleted {c} updates from depth buffer')
    

    Not a problem related but we also need to move the fetching of the order book into the loop with an small time delay so we guarantee that we have updates from bevor and after the fetch.

    # check if we already have updates in database
            while True:
                # get orderbook snapshot
                orderBook = self.getOrderbook()
    
                # small delay to updates with u > then order book to collect
                time.sleep(2)
                ...    
    

    Another thing is that .peekitem() works not as than expected. .peekitem(1) will deliver the first entry as wanted. Now all conditions are met.

    Last thing in the UpdateOrderbook we need to convert the pulled bids and asks from the orderbook to SortedDicts bevor the slicing part. So we guarantee order bevor slicing.

    # covert to ordered dicts
    orderBook = document['orderBook']
    orderBook['b'] = SortedDict(orderBook['b'])
    orderBook['a'] = SortedDict(orderBook['a'])