Search code examples
c#multithreadingwinformsbackgroundworkerinteractive-brokers

How can I get multiple historical data requests from Interactive Brokers (IB) C# sample app. (threading UI problems, background threads)


I think this problem requires and understanding of how Interactive Broker's C# sample app that comes with the API works. I am trying to modify IB's sample C# app that comes with the API to request multiple historical data series, but I have run into multiple problems. It seems that I need to have the program wait until the current data request is completed before submitting the second, third.. etc etc etc. If I add the following code (assuming I added a button to start the process), the second request seems to "run over" the first request, messing up the charts that each series produces in the app. That is the data is merged into one chart. Also too many requests causes a racing condition that IB starts rejecting the data requests. The code in this case is:

private void button7_Click(object sender, EventArgs e)
{
    if (IsConnected)
    {
        srInputTickerFile = new StreamReader(stInputTickerFileName);
        if (!inputTickerFileOpenFlag)
        {
            srInputTickerFile = new StreamReader(stInputTickerFileName);
            inputTickerFileOpenFlag = true;
        }

        while (srInputTickerFile.Peek() >= 0)
        {
            String line = srInputTickerFile.ReadLine();
            String[] lineSplit = line.Split(',');
            string ticker = lineSplit[0];
            string timeDateTo = lineSplit[1];
            string myDuration = lineSplit[2];
            string myBarSize = lineSplit[3];
            string myWhatToShow = lineSplit[4];
            int myRTH = Int32.Parse(lineSplit[5]);
            Contract contract = GetMDContractForHistorical(line);
            contract.ConId = histCounter + 1;
            string endTime = hdRequest_EndTime.Text.Trim();
            string duration = hdRequest_Duration.Text.Trim() + " " +
                hdRequest_TimeUnit.Text.Trim();
            string barSize = hdRequest_BarSize.Text.Trim();
            string whatToShow = hdRequest_WhatToShow.Text.Trim();
            int outsideRTH = contractMDRTH.Checked ? 1 : 0;
            historicalDataManager.AddRequest(contract, timeDateTo, myDuration, 
            myBarSize, myWhatToShow, myRTH, 1, cbKeepUpToDate.Checked);
            historicalDataTab.Text = Utils.ContractToString(contract) + " (HD)";
            ShowTab(marketData_MDT, historicalDataTab);
        }
        srInputTickerFile.Close();
    }
}

I have tried other things such as adding after the function call

 historicalDataManager.AddRequest

the following code so that it can process the request.

  System.Threading.Thread.Sleep(3000);

The problem here seems to be that the app passes control back to the UI thread to process the graphs, thereby preventing the chart to be displayed.

If I try to use a background worker thread, and put the "while" part of the code inside it. That is,

new Thread(() =>
{
    Thread.CurrentThread.IsBackground = true;
    //while code above goes here
}).Start();

This code give an error at historicalDataManager.AddRequest because it resides on the UI thread, not the worker thread. It will also generate an error if I add code in the while loop to update the text boxes in the app because they reside on the UI thread.

So I guess it boils down to:

  1. How to properly throttle the data requests.
  2. If a background worker thread is used, how to access function calls on the UI thread in the IB app.

Solution

  • So I guess it boils down to: 1) How to properly throttle the data requests. 2) If a background worker thread is used, how to access function calls on the UI thread in the IB app.

    1) How to properly throttle the data requests. It depends how far you want to go. If you want a simple solution you can use...

    // On UI thread, non-blocking
    Task.Delay(1000).ContinueWith(() => {
        // make request 2
    }
    

    For complete control of all requests, and to maintain both the 50/sec api message limit and the 6/min (soft) or short term burst limits of historical data requests I would recommend at least 2 separate threads based on a producer/consumer model.

    A basic outline to get you started follows... (alternative to Task.Delay above)

    // The data class used to queue a request
    class Request
    {
        public int? reqId;
        public ReqType? reqType, reqFrom;
        public bool snapshot = false, regulatorySnaphsot = false;
        public string? genericTickList;
    
        // For historical data
        public string? endDateTime, duration, barSize, show;
        public int? useRTH, formatDate;
        public bool update = false;
    
        public Request(int reqId, ReqType reqType, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
        {
            // Historical data
            this.reqId = reqId;
            this.reqType = reqType;
            this.contract = contract;
            this.endDateTime = endDateTime;
            this.duration = duration;
            this.barSize = barSize;
            this.show = show;
            this.useRTH = useRTH;
            this.formatDate = formatDate;
            this.update = update;
        }
    }
    
    /*
        * This class receives all requests, queues them and process with delay to avoid exceeding message rate limitations
        * It also needs to take care of all outstanding requests, so requests for data can be cancelled in one place.
    */
    class Requests<T>:IDisposable where T : Request
    {
        private readonly IBClient ibClient;
        private readonly object msgLock = new();
        private readonly object historyLock = new();
        private readonly Thread msgThread, contractThread, mktDataThread, historyThread;
        private readonly Queue<Request> msgQueue = new();
        private readonly Queue<Request> historyQueue = new();
        private readonly static AutoResetEvent waitHistory = new(true);
        private readonly Dictionary<int, Request> historyReqs = new();
    
    
        public Requests(IBClient ibClient)
        {
            this.ibClient = ibClient;
            msgThread = new Thread(ConsumeMsg);
            msgThread.Start();
    
            historyThread = new Thread(ConsumeHistory);
            historyThread.Start();
        }
    
        private void EnqueueMsg(T req)
        {
            lock(msgLock)
            {
                msgQueue.Enqueue(req);
                Monitor.PulseAll(msgLock);
            }
        }
        private void ConsumeMsg()
        {
            /*
                * The message queue does not wait other than ~25ms for rate limitation of messages (50/sec max).
                * Other queues are responsible for limiting request rates depending on request type.
                * We do not increment any counters here, that is done in the respective queue that rate limits requests by type
                * EVERY counter is decremented here!
            */
            while(true)
            {
                Request req;
                lock(msgLock)
                {
                    while(msgQueue.Count == 0)
                        Monitor.Wait(msgLock);    // Wait for next Task in queue
                    req = msgQueue.Dequeue();
                    if(req == null)
                        return;      // This signals our exit
                }
    
                switch(req.reqType)
                {
                    case ReqType.History:
                        ibClient.ClientSocket.reqHistoricalData((int)req.reqId, req.contract, req.endDateTime, req.duration, req.barSize, req.show, req.useRTH ?? 1, req.formatDate ?? 1, req.update, new List<TagValue>());
                        break;
    
                    case ReqType.CancelHistory:
                        ibClient.ClientSocket.cancelHistoricalData((int)req.reqId);
                        historyReqs.Remove((int)req.reqId);
                        break;
                }
                Thread.Sleep(20); // This prevents over 50 msg per second.
            }
        }
    
    
        public void HistoricalData(int reqId, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
        {
            EnqueueHistoryRequest((T)new Request(reqId, ReqType.History, contract, endDateTime, duration, barSize, show, useRTH, formatDate, update));
        }
        public void CancelHistoryData(int reqId)
        {
            EnqueueMsg((T)new Request(reqId, ReqType.CancelHistory));
        }
        private void EnqueueHistoryRequest(T req)
        {
            lock(historyLock)
            {
                historyQueue.Enqueue(req);
                Monitor.PulseAll(historyLock);
            }
        }
        private void ConsumeHistory()
        {
            while(true)
            {
                Request req;
                lock(historyLock)
                {
                    while(historyQueue.Count == 0)
                        Monitor.Wait(historyLock);      // Wait for next Task in queue
                    req = historyQueue.Dequeue();
                }
                if(req == null) return;         // This signals our exit
                
    
                EnqueueMsg((T)req);
                // We actually have a soft 6/min limit on hist data.
                // This delay does not follow that, we're limiting the number to 50 instead.
                Thread.Sleep(800);
            }
        }
        public void HistDataRecieved(int reqID)
        {
            historyReqs.Remove(reqID);
            if(!waitHistory.SafeWaitHandle.IsClosed && ReqCntHistory < MaxReqHistory)
                waitHistory.Set();     // We can proceed if less than maxHistory requests outstanding
        }
    }
    

    To use, in a UI form

    Requests = new(iBclient);
    requests.Add(new HistoricalData(......));
    

    2) If a background worker thread is used, how to access function calls on the UI thread in the IB app.

    The iBclient class runs on a separate thread, and sends events and data back to the UI thread. This means your UI thread only works when data is received, there is never any need to Thread.Sleep or DoEvents on the UI thread.

    You can receive these events on the UI thread by registering such as below...

    private readonly EReaderMonitorSignal signal;
    private readonly IBClient ibClient;
    
    
    partial class MyForm:Form
    {
        public MyForm()
        {
            InitializeComponent();
            signal = new EReaderMonitorSignal();
            ibClient = new(signal);
    
            // Register for events
    
            // Depending on your API version use one of the following to know when connected.
            ibClient.ConnectedAndReady += Recv_ConnectedAndReady;
            ibClient.NextOrderId += Recv_NextOrderId;
            
            ibClient.Error += Recv_Error;
            ibClient.HistoricalData += Recv_HistoricalData;
            ibClient.HistoricalDataEnd += Recv_HistoricalDataEnd;
    
    
            // Connect and start API thread.
            ibClient.ClientId = 0;
            ibClient.ClientSocket.eConnect("127.0.0.1", 7496, 0); // (ClientId = 0) This is the master client that will receive all orders (even from TWS)
            var reader = new EReader(ibClient.ClientSocket, signal);
            reader.Start();
    
            new Thread(() =>
            {
                while(ibClient.ClientSocket.IsConnected())
                {
                    signal.waitForSignal();
                    reader.processMsgs();
                }
            })
            { IsBackground = true }.Start();
        }
            
            
        private void Recv_ConnectedAndReady(ConnectionStatusMessage msg)
        {
            // We are now connected and can make requests
            // Older API versions use NextOrderId(int)
            ibClient.ClientSocket.reqHistoricalData(reqId, contract, endDateTime, durationStr, "5 min", "Trades", 0, 0, new List<TagValue>())
        }
        
        private void Recv_Error(int reqId, int errorCode, string? str, string? ordRejectJson, Exception? ex)
        {
            // Possible combinations of parameters
            // reqId, errorCode, str, ex
            //
            // 0,0,null,Exception
            // 0,0,string,null
            // -1,int,string,null
            // int,int,string,null
        }
        
        private void Recv_HistoricalData(HistoricalDataMessage msg)
        {
            //msg.RequestId is the same Id you used to make the request. This is how you can separate data to different charts.
        }
        
        
        private void Recv_HistoricalDataEnd(HistoricalDataEndMessage msg)
        {
            //request complete.
        }
    }