Search code examples
performancewcfthroughputwcf-callbacks

Monitor the queue of TCP messages awaiting transmission


I have a WCF client and server using NetTcp. The server is hosted in a ServiceHost from within a Windows Service. The client subscribes to the WCF service and registers its callback interface and its InstanceContext. The callback interface has several one-way method calls. I have it throttled wide open.

All this works great. However, in my testing I have code in my Windows service that goes through a tight loop sending messages back to the client through one of the one-way method calls as fast as it can. I have exceeded the TCP connection's ability to pass the data and the result is that the messages get queued up. This is what I expected.

The question is this: Is there any way on the server to find out how backed up the queue is so that I can limit the speed that I send messages based on the real-time throughput?


Solution

  • We never found an answer for this, but we created our own workaround which seems to do the trick. For completeness, I will post it here. I hope it helps someone else faced with a similar situation.

    The Requirements:

    1. We have a long running task that will be running on a hardware server. When I say long-running I mean from one day to many days.
    2. We would like to have a user interface that can be started on any other desktop in the network to view statistics from the long running task in a graphic manner.
    3. The user interface can be started and stopped multiple times and multiple instances can occur at once.
    4. The user interface should produce not excessive burden on the long running task. Multiple UIs running should not slow it down.

    The Design:

    1. The long running task is contained in a DLL. There is one main class with a run() method that starts the long running task.
    2. We have created a Windows Service that will run automatically on the hardware server.
    3. The Windows service will create an instance of the main class and start the task by calling the run() method.
    4. The Windows service will also create an instance of ServiceHost and start an instance of a WCF service.
    5. The Windows service will pass a reference of the main class to the WCF service.
    6. The WCF service will create handlers for the six events that the main class can raise.
    7. All the communication from the main class to the WCF service is one way and by means of raising these six events.
    8. The UIs will be clients to the WCF service and the connection will be with a NetTcp binding.
    9. The WCF service has a subscribe() method and an unsubscribe() method so that potential UIs can join and leave.
    10. When a UI calls the subscribe() method, it passes a unique identifier as a string. The WCF service puts the identifier and its OperationContext into a ConcurrentDictionary.
    11. When a UI calls the unsubscribe() method, that entry is removed from the ConcurrentDictionary.
    12. The Contract between the UI and the WCF service has one-way messages from the WCF service to the clients for each type of event that the long running task can raise.
    13. When an event is raised during the long running task, the WCF service handles the event and iterates through the registered UIs and sends one-way messages to the UIs.

    All this is working at this point.

    The Problem:

    As we were stress testing this system, we created a scenario where the long running task bombarded the WCF service with events as fast as it could. This would be a worst-case scenario, but we had to be able to handle it. The WCF service was able to handle the events and place the messages on the Tcp channel. Since the messages are one-way, the WCF service doesn't block waiting for the completion of the send which enables it to keep up with the events that are being raised.

    The problem occurs when the user interface does not pull the messages off of the channel as fast as the server shoves them in. The messages back up and eventually starts timing out and causing the channel to go into a faulted state. We were hoping to detect this condition before the faulted state happened so we could start throwing away messages. Unfortunately, we could find no mechanism to detect the backlog on this channel. If we changed the messages to two-way, the WCF service would block until the message completed and the channel would not become backed up, however, this would affect the long running service and slow it down. Not good.

    The Solution:

    We solved this problem by creating a special class in the same DLL which contains the long running task. This class is responsible for communicating back to any attached user interfaces. This communication object contained a ConcurrentQueue for each event to be raised. When the long running task would normally raise the event back to the WCF service, it would now call a method in this communication object instead.

    Inside this method, the communication object would enter the event args into the ConcurrentQueue for that event. The communication object also has a method that is started on a separate thread when the object is created. This new method would continually loop through the concurrentQueues and pop off the event args and actually raise the event. We changed the NetTcp calls to be two-way so the routine in the thread would be bound to the speed of the TCP channel, but because it is in a separate thread, it will not slow down the main processing of the long running task.

    Now that we have a ConcurrentQueue that we can get our hands on, we can check the backlog. We have some limit (in the current case it is 10) that we logically set for the concurrentQueues. As the long running task calls the method to add an event args to a queue, it first checks the count of the queue and if it is less than our logical limit, it enqueues the event args, otherwise it simply drops it and continues. In this way the speed of the long running queue is not impacted and the WCF service will not back up and cause a faulted channel state.

    In Summary:

    We welcome any feedback or alternate ideas. This seems to be working fine for us and appears to be resiliant.

    class UI
    {
        #region Class Scoped Variables
        private Int32 _threashold = 10;
        private bool _continue = true;
        #endregion Class Scoped Variables
    
        #region Public Delegate Definitions
        public delegate void OnPlanSelectionChangedDelegate(PlanSelectionChangedEventArgs e);
        // other lines deleted for brevity
        #endregion Public Delegate Definitions
    
        #region Local Delegate Instances
        private OnPlanSelectionChangedDelegate _onPlanSelectionChangedDelegate = null;
        // other lines deleted for brevity
        #endregion Local Delegate Instances
    
        #region Local Queues for Delegates
        private ConcurrentQueue<PlanSelectionChangedEventArgs> _planSelectionChangedQueue
            = new ConcurrentQueue<PlanSelectionChangedEventArgs>();
        // other lines deleted for brevity
        #endregion Local Queues for Delegates
    
        #region Constructor
        public UI(OnPlanSelectionChangedDelegate onPlanSelectionChanged)
        {
            _onPlanSelectionChangedDelegate = onPlanSelectionChanged;
            // other lines deleted for brevity
            ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), null);
        }
        #endregion Constructor
    
        #region Public Methods
        public void Shutdown()
        {
            _continue = false;
        }
        public void SendPlanSelection(PlanSelectionChangedEventArgs e)
        {
            if (_planSelectionChangedQueue.Count < _threashold)
            {
                if (_cntPlanSelectionDropped > 0)
                {
                    e.Dropped = _cntPlanSelectionDropped;
                }
                _planSelectionChangedQueue.Enqueue(e);
                _cntPlanSelectionDropped = 0;
            }
            else
            {
                _cntPlanSelectionDropped++;
            }
        }
        // other lines deleted for brevity
        #endregion Public Methods
    
        #region Private Asychronous Method
        private void DoWork(object dummy)
        {
            PlanSelectionChangedEventArgs planSelectionChangedEventArgs = null;
            while (_continue)   // process this loop until told to quit
            {
                // Plan Selection Changed
                // Try to get the next event args in a thread safe way
                if (_planSelectionChangedQueue.TryDequeue(out planSelectionChangedEventArgs))
                {
                    // We got an event args from the queue, do we have a valid delegate?
                    if (_onPlanSelectionChangedDelegate != null)
                    {
                        // We have a delegate, call it with the event args and rais the event
                        _onPlanSelectionChangedDelegate(planSelectionChangedEventArgs);
                    }
                }
    
                // other lines deleted for brevity
            }
        }
        #endregion Private Asychronous Method
    }