I have 3rd party WebSocket that I need to consume where the request is sent in one method and the response is given in another. How can I convert this type of pattern to a more typical async/await TPL Task that will support cancellation (via token), continue with and all the other goodies.
This is what I came up with so far, although I'm not sure if it works. I can't test it until Monday.
So here are my questions:
Will this work?
I've been reading about TaskCompletionSource
. Is there a better way to do any of this possibly with TaskCompletionSource
?
I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions
could clear any positions already returned.
Even with the lock
I know that if there is a timeout or cancellation that creates a problem so maybe I just remove the cancellation token. The only other thing I can think of would be to create multiple clients that are all authenticated and ready to process a request and then manage them like a thread pool for these types of request but I'm not going to be doing that anytime soon so other than that... idk.
private object GetPositionsLock = new object();
private IEnumerable<Position> Positions { get; } = new List<Position>();
private Task PositionsReturned { get; set; }
public async Task<List<Position>> AsyncGetPositions(CancellationToken token)
{
try
{
lock (GetPositionsLock)
{
Positions.Clear();
IbWebSocket.reqPositions();
PositionsReturned = new Task(null, token, TaskCreationOptions.None);
PositionsReturned.GetAwaiter().GetResult();
return token.IsCancellationRequested ? null : Positions.ToList().Where(x => x.Shares != 0).ToList();
}
}
catch (TimeoutException ex)
{
//LogTimeout
throw;
}
catch (Exception ex)
{
//LogError
throw;
}
}
/// <summary>
/// Provides a position to the reqPositions() method. When the last position has been received positionEnd() is called.
/// </summary>
/// <param name="contract"></param>
/// <param name="value"></param>
/// <param name="account"></param>
public void position(string account, Contract contract, double value)
{
try
{
Positions.Concat(new Position(account, contract, value));
}
catch (Exception ex)
{
//LogError
throw;
}
}
/// <summary>
/// Indicates all the positions have been transmitted.
/// </summary>
public void positionEnd()
{
PositionsReturned = Task.CompletedTask;
}
Will this work?
No. You shouldn't use the Task
constructor, use lock
with async
code, or mix blocking with asynchronous code.
I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?
Yes, that's the type to use for this scenario.
I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.
I recommend getting this working first, and then handling the additional requirement of reentrancy. Each of those are hard enough on their own.
What you want to do is have a TaskCompletionSource<T>
and complete it when positionEnd
is invoked. For simplicity, start without reentrancy concerns and without the CancellationToken
. Once you fully understand TaskCompletionSource<T>
, then you can add complexity:
private List<Position> Positions { get; } = new();
private TaskCompletionSource<List<Position>> PositionsReturned { get; set; }
public Task<List<Position>> AsyncGetPositions()
{
Positions.Clear();
PositionsReturned = new();
IbWebSocket.reqPositions();
return PositionsReturned.Task;
}
public void position(string account, Contract contract, double value)
{
Positions.Add(new Position(account, contract, value));
}
public void positionEnd()
{
PositionsReturned.TrySetResult(Positions);
}