Search code examples
pythonpandaslisttws

How to store market depth data stream into list/dataframe - TWS (Ibapi Python)


I have a script that can stream market depth data (QAN.ASX example) from TWS using the IBAPI. I am having some trouble trying to store the data that is being streamed. Seems a simple issue, I was hoping someone might have a suggestion or two to modify my script? For the sake of convenience, I thought it best to include the whole script. Note the issue is inside Main().

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *
import datetime
import threading
import time
import datetime
import pandas as pd
    
RUN_FLAG = False;
 
class TestClient(EClient):
    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)
 
class TestWrapper(EWrapper):
    def __init__(self):
        EWrapper.__init__(self)
 
    def updateMktDepth(self, reqId: TickerId, position: int, operation: int,
                side: int, price: float, size: int):
            super().updateMktDepth(reqId, position, operation, side, price, size)
            print("UpdateMarketDepth", "ReqId:", reqId, "Position:", position, "Operation:",
                  operation, "Side:", side, "Price:", price, "Size:", size)
 
    def updateMktDepthL2(self, reqId: TickerId, position: int, marketMaker: str,
            operation: int, side: int, price: float, size: int, isSmartDepth: bool):
        super().updateMktDepthL2(reqId, position, marketMaker, operation, side,
            price, size, isSmartDepth)
        print("UpdateMarketDepthL2", "ReqId:", reqId, "Position:", position, "MarketMaker:", marketMaker, "Operation:",
            operation, "Side:", side, "Price:", price, "Size:", size, "isSmartDepth:", isSmartDepth)
 
    def error(self, reqId: TickerId, errorcode: int, errorString: str):
        print("Error= ", reqId, " ", errorcode, " ", errorString)
 
    def contratDetails(self, reqId: int, contractDetails: ContractDetails):
        print("ContractDetails: ", reqId, " ", contractDetails)
 
    def historicalData(self, reqId: int, bar: BarData):
        print("HistoricalData. ", reqId, " Date:", bar.date, "Open:", bar.open,
              "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
              "Count:", bar.barCount, "WAP:", bar.average)
 
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        global RUN_FLAG
        print("HistoricalDataEnd ", reqId, "from", start, "to", end)
        RUN_FLAG = False
        
    def historicalDataUpdate(self, reqId: int, bar: BarData):
        print("HistoricalDataUpdate. ", reqId, " Date:", bar.date, "Open:", bar.open,
              "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
              "Count:", bar.barCount, "WAP:", bar.average)
 
    def tickPrice(self, reqId: TickerId, tickType: TickType, price: float,
                    attrib: TickAttrib):
        super().tickPrice(reqId, tickType, price, attrib)
        print("TickPrice. TickerId:", reqId, "tickType:", tickType,
            "Price:", price, "CanAutoExecute:", attrib.canAutoExecute,
            "PastLimit:", attrib.pastLimit, end=' ')
        if tickType == TickTypeEnum.BID or tickType == TickTypeEnum.ASK:
            print("PreOpen:", attrib.preOpen)
        else:
            print()
 
    def connectAck(self):   
        global RUN_FLAG
        print("Connect ACK")
        RUN_FLAG = True
 
    def nextValidId(self, orderId:int):
        self.nextOrderId = orderId
        print("I have nextValidId", orderId)
        
def main():
    data = []        # Attempting to append to list
    try:
        while True:
            global RUN_FLAG
            wrapper = TestWrapper()
            client = TestClient(wrapper)
            client.connect("127.0.0.1", 7496, 101)
            print("Done with connect()")
            
            t = threading.Thread(name="TWSAPI_worker", target=client.run)
            t.start()
            print("Returned from run()")
         
            while not RUN_FLAG:
                time.sleep(1)
                print("No orders")

            contract = Contract()
            contract.symbol = 'QAN'
            contract.secType = 'STK'
            contract.exchange = 'ASX'
            contract.currency = 'AUD'
  
            client.reqMktDepth(4002, contract, 20, False, [])         # tried data.append(....)

            print("Returned from call to reqHistoricalData()")
            print("Waiting to finish.")

            while RUN_FLAG:
                time.sleep(1)
                print("No orders")

    except KeyboardInterrupt:
            print("ALL DONE!")         
            print(data)                                    # data list remains blank
            client.disconnect()

if __name__ == "__main__":
    main()

Solution

  • It seems this issue was actually pretty easy to solve. I simply added a blank dataframe to the routine and appended my data inside one of the functions that is responsible for printing the stream. The routine now collects the appends data and updates a csv for me while the data is steaming.

    def updateMktDepth(self, reqId: TickerId, position: int, operation: int, side: int, price: float, size: int):
        global df
        super().updateMktDepth(reqId, position, operation, side, price, size)
        print("UpdateMarketDepth", "ReqId:", reqId, "Position:", position, "Operation:", operation, "Side:", side, "Price:", price, "Size:", size)
        
        time = datetime.datetime.now()
        cols = ['Time','ReqId','Position','Operation','Side','Price','Size']
        data = [time,reqId,position,operation,side,price,size]
        csv_file = 'C:/Users/Toby/Desktop/IB/Training/QAN_Depth.csv'
        
        d2 = pd.DataFrame(data, cols)
        d2 = d2.T
        df = df.append(d2)
        df.to_csv(csv_file)