Search code examples
websocketasync-awaittask-parallel-libraryienumerabletaskcompletionsource

How to convert this WebSocket pattern to a Task I can await/cancel/continuewith/


I have 3rd party WebSocket that I need to consume where the request is sent in one method and the response is given in another. How can I convert this type of pattern to a more typical async/await TPL Task that will support cancellation (via token), continue with and all the other goodies.

This is what I came up with so far, although I'm not sure if it works. I can't test it until Monday.

So here are my questions:

  1. Will this work?

  2. I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

  3. I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

  4. Even with the lock I know that if there is a timeout or cancellation that creates a problem so maybe I just remove the cancellation token. The only other thing I can think of would be to create multiple clients that are all authenticated and ready to process a request and then manage them like a thread pool for these types of request but I'm not going to be doing that anytime soon so other than that... idk.

    private object GetPositionsLock = new object();
    private IEnumerable<Position> Positions { get; } = new List<Position>();
    private Task PositionsReturned { get; set; }
    public async Task<List<Position>> AsyncGetPositions(CancellationToken token)
    {
        try
        {
            lock (GetPositionsLock)
            {
                Positions.Clear();
                IbWebSocket.reqPositions();
                PositionsReturned = new Task(null, token, TaskCreationOptions.None);
                PositionsReturned.GetAwaiter().GetResult();
                return token.IsCancellationRequested ? null : Positions.ToList().Where(x => x.Shares != 0).ToList();
            }
        }
        catch (TimeoutException ex)
        {
            //LogTimeout
            throw;
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    ///  <summary>
    ///         Provides a position to the reqPositions() method.  When the last position has been received positionEnd() is called.
    ///  </summary>
    ///  <param name="contract"></param>
    ///  <param name="value"></param>
    ///  <param name="account"></param>
    public void position(string account, Contract contract, double value)
    {
        try
        {
            Positions.Concat(new Position(account, contract, value));
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    /// <summary>
    ///     Indicates all the positions have been transmitted.
    /// </summary>
    public void positionEnd()
    {
        PositionsReturned = Task.CompletedTask;
    }
    

Solution

  • Will this work?

    No. You shouldn't use the Task constructor, use lock with async code, or mix blocking with asynchronous code.

    I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

    Yes, that's the type to use for this scenario.

    I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

    I recommend getting this working first, and then handling the additional requirement of reentrancy. Each of those are hard enough on their own.


    What you want to do is have a TaskCompletionSource<T> and complete it when positionEnd is invoked. For simplicity, start without reentrancy concerns and without the CancellationToken. Once you fully understand TaskCompletionSource<T>, then you can add complexity:

    private List<Position> Positions { get; } = new();
    private TaskCompletionSource<List<Position>> PositionsReturned { get; set; }
    public Task<List<Position>> AsyncGetPositions()
    {
      Positions.Clear();
      PositionsReturned = new();
      IbWebSocket.reqPositions();
      return PositionsReturned.Task;
    }
    
    public void position(string account, Contract contract, double value)
    {
      Positions.Add(new Position(account, contract, value));
    }
    
    public void positionEnd()
    {
      PositionsReturned.TrySetResult(Positions);
    }