Search code examples
c#multithreadingthreadpoolmessagebrokerwebsocket-sharp

Broker that makes async calls with a synchronous facade in c#


I want to make many calls down a websocket and get a result for each call.

ie

_svc.DoAthing(param) =broker calls=> ws.SendMessage(doathingmessage(ticket))
                     <=broker returns= ws.Onmessage+=handler=>(doathingresult(ticket))

what is the best way to make a broker such that

  1. This async request appears synchronous
  2. The broker can handle hundreds of requests
  3. The client should not be constantly polling on its thread, it should either block or await.

Not sure if a having a bunch of threads polling for tickets completed is the best way.


Solution

  • I think this will work well but am still looking for a more elegant solution

        private ConcurrentDictionary<Guid, ManualResetEvent> _waitingClients = new ConcurrentDictionary<Guid, ManualResetEvent>();
        private ConcurrentDictionary<Guid, byte[]> _hostResponses = new ConcurrentDictionary<Guid, byte[]>();
    
        public TEventResult SendCommand<TEventResult>(ICommand cmd)
        {
            var mre = new ManualResetEvent(false);
            var s = _serialize(cmd);
            if(_waitingClients.TryAdd(cmd.Ticket, mre))
            {
                if (!_sendDownPipe(s))
                {
                    _waitingClients.TryRemove(cmd.Ticket, out mre);
                    throw new Exception("Could not get Response");
                }
            }
            mre.WaitOne();//todo timeout
    
            byte[] resp;
            if(_hostResponses.TryRemove(cmd.Ticket, out resp))
            {
                var o = _deserialize<TEventResult>(resp);
                return o;
            }
    
            throw new Exception("Could not get response!");
        }
        private void _eventRecieved(byte[] s)
        {
            var evt = _deserialize<IEvent>(s);
            if(_hostResponses.TryAdd(evt.Ticket, s))
            {
                ManualResetEvent mre;
                if(_waitingClients.TryRemove(evt.Ticket, out mre))
                {
                    mre.Set();
                }
            }
        }