Search code examples
c#multithreadingasync-awaitsynchronizationcontext

How to wrap a multithreaded callback for async/await?


I have async/await code and want to use an API similar to a websocket. It takes a callback for receiving new messages which is called from another thread.

Can I execute this callback in the same async/await context as the connection initiation without resorting to locking?

I think this is what SynchronizationContext is for but I can't tell if its threadsafe. If I log the thread-id, each callback will be on a different thread. If I log Task.CurrentId its null. I think the same synchronisation context moves across different threads so this might be ok but I don't know how to confirm it.

// External api, the callbacks will be from multiple threads
public class Api
{
    public static Connect(
        Action<Connection> onConnect,
        Action<Connection> onMessage) 
    {}
}

async Task<Connection> ConnectAsync(Action<Message> callback)
{
    if (SynchronizationContext.Current == null)
    {
        SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
    }

    var syncContext = SynchronizationContext.Current;

    var tcs = new TaskCompletionSource<Connection>();

    // use post() to ensure callbacks from other threads are executed thread-safely

    Action<Connection> onConnect = conn => 
    {
        syncContext.Post(o => tcs.SetResult(conn), null);
    };
    Action<Message> onMsg = msg => 
    { 
        syncContext.Post(o => callback(msg), null);
    };

    // call the multi-threaded non async/await api supplying the callbacks

    Api.Connect(onConnect, onMsg);

    return await tcs.Task;
}

var connection = await ConnectAsync(
    msg => 
    { 
        /* is code here threadsafe with the send without extra locking? */ 
    });

await connection.Send("Hello world);

Solution

  • Thanks to @Evk who pointed out that the default SynchronizationContext doesn't actually synchronise anything or implement send/post in the way you would expect.

    https://github.com/StephenClearyArchive/AsyncEx.Context

    The fix is to use Stephen Cleary's async library which implements a SynchronizationContext as a message pump in a single thread so that the post() calls are called in the same thread as the other awaited calls.

    // External api, the callbacks will be from multiple threads
    public class Api
    {
        public static Connect(
            Action<Connection> onConnect,
            Action<Connection> onMessage) 
        {}
    }
    
    async Task<Connection> ConnectAsync(Action<Message> callback)
    {
        var syncContext = SynchronizationContext.Current;
    
        var tcs = new TaskCompletionSource<Connection>();
    
        // use post() to ensure callbacks from other threads are executed thread-safely
    
        Action<Connection> onConnect = conn => 
        {
            syncContext.Post(o => tcs.SetResult(conn), null);
        };
        Action<Message> onMsg = msg => 
        { 
            syncContext.Post(o => callback(msg), null);
        };
    
        // call the multi-threaded non async/await api supplying the callbacks
    
        Api.Connect(onConnect, onMsg);
    
        return await tcs.Task;
    }
    
    //
    // https://github.com/StephenClearyArchive/AsyncEx.Context
    //
    Nito.AsyncEx.AsyncContext.Run(async () =>
    {
        var connection = await ConnectAsync(
            msg => 
            { 
                /* this will execute in same thread as ConnectAsync and Send */ 
            });
    
        await connection.Send("Hello world);
    
        ... more async/await code
    });