I have read a number of articles and questions here on StackOverflow about wrapping a callback based API with a Task
based one using a TaskCompletionSource
, and I'm trying to use that sort of technique when communicating with a Solace PubSub+ message broker.
My initial observation was that this technique seems to shift responsibility for concurrency. For example, the Solace broker library has a Send()
method which can possibly block, and then we get a callback after the network communication is complete to indicate "real" success or failure. So this Send()
method can be called very quickly, and the vendor library limits concurrency internally.
When you put a Task around that it seems you either serialize the operations ( foreach message await SendWrapperAsync(message)
), or take over responsibility for concurrency yourself by deciding how many tasks to start (eg, using TPL dataflow).
In any case, I decided to wrap the Send
call with a guarantor that will retry forever until the callback indicates success, as well as take responsibility for concurrency. This is a "guaranteed" messaging system. Failure is not an option. This requires that the guarantor can apply backpressure, but that's not really in the scope of this question. I have a couple of comments about it in my example code below.
What it does mean is that my hot path, which wraps the send + callback, is "extra hot" because of the retry logic. And so there's a lot of TaskCompletionSource
creation here.
The vendor's own documentation makes recommendations about reusing their Message
objects where possible rather then recreating them for every Send
. I have decided to use a Channel
as a ring buffer for this. But that made me wonder - is there some alternative to the TaskCompletionSource
approach - maybe some other object that can also be cached in the ring buffer and reused, achieving the same outcome?
I realise this is probably an overzealous attempt at micro-optimisation, and to be honest I am exploring several aspects of C# which are above my pay grade (I'm a SQL guy, really), so I could be missing something obvious. If the answer is "you don't actually need this optimisation", that's not going to put my mind at ease. If the answer is "that's really the only sensible way", my curiosity would be satisfied.
Here is a fully functioning console application which simulates the behaviour of the Solace library in the MockBroker
object, and my attempt to wrap it. My hot path is the SendOneAsync
method in the Guarantor
class. The code is probably a bit too long for SO, but it is as minimal a demo I could create that captures all of the important elements.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
internal class Message { public bool sent; public int payload; public object correlator; }
// simulate third party library behaviour
internal class MockBroker
{
public bool TrySend(Message m, Action<Message> callback)
{
if (r.NextDouble() < 0.5) return false; // simulate chance of immediate failure / "would block" response
Task.Run(() => { Thread.Sleep(100); m.sent = r.NextDouble() < 0.5; callback(m); }); // simulate network call
return true;
}
private Random r = new();
}
// Turns MockBroker into a "guaranteed" sender with an async concurrency limit
internal class Guarantor
{
public Guarantor(int maxConcurrency)
{
_broker = new MockBroker();
// avoid message allocations in SendOneAsync
_ringBuffer = Channel.CreateBounded<Message>(maxConcurrency);
for (int i = 0; i < maxConcurrency; i++) _ringBuffer.Writer.TryWrite(new Message());
}
// real code pushing into a T.T.T.DataFlow block with bounded capacity and parallelism
// execution options both equal to maxConcurrency here, providing concurrency and backpressure
public async Task Post(int payload) => await SendOneAsync(payload);
private async Task SendOneAsync(int payload)
{
Message msg = await _ringBuffer.Reader.ReadAsync();
msg.payload = payload;
// send must eventually succeed
while (true)
{
// *** can this allocation be avoided? ***
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
msg.correlator = tcs;
// class method in real code, inlined here to make the logic more apparent
Action<Message> callback = (msg) => (msg.correlator as TaskCompletionSource<bool>).SetResult(msg.sent);
if (_broker.TrySend(msg, callback) && await tcs.Task) break;
else
{
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
}
// real code raising an event here to indicate successful delivery
await _ringBuffer.Writer.WriteAsync(msg);
Console.WriteLine(payload);
}
private Channel<Message> _ringBuffer;
private MockBroker _broker;
}
internal class Program
{
private static async Task Main(string[] args)
{
// at most 10 concurrent sends
Guarantor g = new(10);
// hacky simulation since in this demo there's nothing generating continuous events,
// no DataFlowBlock providing concurrency (it will be limited by the Channel instead),
// and nobody to notify when messages are successfully sent
List<Task> sends = new(100);
for (int i = 0; i < 100; i++) sends.Add(g.Post(i));
await Task.WhenAll(sends);
}
}
Yes, you can avoid the allocation of TaskCompletionSource
instances, by using lightweight ValueTask
s instead of Task
s. At first you need a reusable object that can implement the IValueTaskSource<T>
interface, and the Message
seems like the perfect candidate. For implementing this interface you can use the ManualResetValueTaskSourceCore<T>
struct. This is a mutable struct, so it should not be declared as readonly
. You just need to delegate the interface methods to the corresponding methods of this struct with the very long name:
using System.Threading.Tasks.Sources;
internal class Message : IValueTaskSource<bool>
{
public bool sent; public int payload; public object correlator;
private ManualResetValueTaskSourceCore<bool> _source; // Mutable struct, not readonly
public void Reset() => _source.Reset();
public short Version => _source.Version;
public void SetResult(bool result) => _source.SetResult(result);
ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
=> _source.GetStatus(token);
void IValueTaskSource<bool>.OnCompleted(Action<object> continuation,
object state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _source.OnCompleted(continuation, state, token, flags);
bool IValueTaskSource<bool>.GetResult(short token) => _source.GetResult(token);
}
The three members GetStatus
, OnCompleted
and GetResult
are required for implementing the interface. The other three members (Reset
, Version
and SetResult
) will be used for creating and controlling the ValueTask<bool>
s.
Now lets wrap the TrySend
method of the MockBroker
class in an asynchronous method TrySendAsync
, that returns a ValueTask<bool>
static class MockBrokerExtensions
{
public static ValueTask<bool> TrySendAsync(this MockBroker source, Message message)
{
message.Reset();
bool result = source.TrySend(message, m => m.SetResult(m.sent));
if (!result) message.SetResult(false);
return new ValueTask<bool>(message, message.Version);
}
}
The message.Reset();
resets the IValueTaskSource<bool>
, and declares that the previous asynchronous operation has completed. A IValueTaskSource<T>
supports only one asynchronous operation at a time, the produced ValueTask<T>
can be awaited only once, and it can no longer be awaited after the next Reset()
. That's the price you have to pay for avoiding the allocation of an object: you must follow stricter rules. If you try to bend the rules (intentionally or unintentionally), the ManualResetValueTaskSourceCore<T>
will start throwing InvalidOperationException
s all over the place.
Now lets use the TrySendAsync
extension method:
while (true)
{
if (await _broker.TrySendAsync(msg)) break;
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
You can print in the Console
the GC.GetTotalAllocatedBytes(true)
before and after the whole operation, to see the difference. Make sure to run the application in Release mode, to see the real picture. You might see that the difference in not that impressive, because the size of a TaskCompletionSource
instance is pretty small compared to the bytes allocated by the Task.Delay
, and by all the string
s generated for writing stuff in the Console
.