Search code examples
c#asynchronoustask-parallel-librarynats.ioevent-based-programming

How to convert EventHandler<MyMessage> parameter into async/await?


I have to consume the following API (bacically i have to subscribe for subject/channel/topic in Stan):

IStanSubscription Subscribe(string subject, StanSubscriptionOptions options, EventHandler<StanMsgHandlerArgs> handler);

And i would like to convert EAP pattern to TAP pattern. What i have done is the following:

public Task<MyMessage> Subscribe(string subject)
{
    var tcs = new TaskCompletionSource<MyMessage>();

    _stanConnection.Subscribe(subject, GetStanSubscriptionOptions(), (sender, args) =>
    {
        string messageAsString = Encoding.UTF8.GetString(args.Message.Data, 0, args.Message.Data.Length);

        try
        {
            args.Message.Ack();

            MyMessage message = JsonConvert.DeserializeObject<MyMessage>(messageAsString);

            tcs.SetResult(message);

        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    });

    return tcs.Task;
}

With the first message everything is fine, but the next one throws the following exception:

An attempt was made to transition a task to a final state when it had already completed

I read some articals and threads like:

As far as I can understand (I am not that good with EventHandler my appolagies in advanced) in both cases they access EventHandler directly from the class and in my case EventHandler is passed as parameter and I am not sure how and unsubcribe.

Cheers!


Solution

  • i would like to convert EAP pattern to TAP pattern.

    That would work, if the API was written with the EAP pattern. But it's not. Instead, it's doing its own kind of subscription pattern, using a delegate to trigger updates.

    So, your options are:

    1. Map the custom subscription pattern into a standard Observable pattern.
    2. Build a buffer (e.g., System.Threading.Channels), have the event handler add items to the buffer, and then consume them as an asynchronous stream.

    With both of these options, be sure to handle the "unsubscription" logic as well.