Search code examples
c#async-awaitconcurrencyiasyncenumerable

Why isn't my async method called immediately in C#


Caller:

var snapshotMessages = snapshotRepository.GetMessages();
_ = Console.Error.WriteLineAsync("Loading XML timetables...");
// some lengthy operation which loads a large dataset from a SQL database
await foreach (var item in snapshotMessages) {
    // process the item
}

Callee:

    public async IAsyncEnumerable<Pport> GetMessages() {
        Console.Error.WriteLine("Start getting messages");
        var timestamp = DateTime.MinValue;
        // some code which start downloading a large file from FTP
    }

I want to parallelise the database loading and the download. However, the "Start getting messages" line doesn't appear, which indicate the program doesn't behave in parallel as I expect.

The documentation says that:

An async method runs synchronously until it reaches its first await expression, at which point the method is suspended until the awaited task is complete. In the meantime, control returns to the caller of the method, as the example in the next section shows.

which doesn't seem to be true here. What have I done wrong?


Solution

  • Iterators - both synchronous and asynchronous - have deferred execution - basically, the state machine is inactive until the actual foreach. However, you can fix this easily by restructuring a little:

    public IAsyncEnumerable<Pport> GetMessages()
    {
        // note: this is neither "async" nor has "yield"
    
        Console.Error.WriteLine("Start getting messages");
        var timestamp = DateTime.MinValue;
        return GetMessagesCore(timestamp);
    }
    private async IAsyncEnumerable<Pport> GetMessagesCore(DateTime timestamp)
    {
        // some code which start downloading a large file from FTP
        ... yield etc
    } 
    

    This approach is also often used to allow only the private method to have the [EnumeratorCancellation] CancellationToken cancellationToken parameter for use with WithCancellation()


    However! An async iterator block - like a regular iterator block - is only a single passive pump allowing a sequence to be fetched iteratively; it is not an actively parallel / concurrent machine. For that you probably want a Channel<T>, i.e. Channel<Pport> with a separate worker initiated via Task.Run (or similar), so you have an active producer/consumer pair with some constrained or unconstrained device between them (the Channel<T>). A channel has the ReadAllAsync() API which exposes the channel as IAsyncEnumerable<T>.

    For example:

    await foreach(var i in new Foo().GetMessages())
    {
        Console.WriteLine(i);
    }
    class Foo
    {
        public IAsyncEnumerable<int> GetMessages()
        {
            Console.Error.WriteLine("Start getting messages");
            // use a bounded channel as a buffer of 10 pending items;
            // there is also an unbounded option available
            Channel<int> channel = Channel.CreateBounded<int>(
                new BoundedChannelOptions(capacity: 10));
            _ = Task.Run(() => Produce(channel.Writer));
            return channel.Reader.ReadAllAsync();
        }
    
        private async Task Produce(ChannelWriter<int> writer)
        {
            try
            {
                for (int i = 0; i < 1000; i++)
                {
                    await writer.WriteAsync(i);
                }
                writer.TryComplete();
            }
            catch (Exception ex)
            {
                writer.TryComplete(ex);
            }
        }
    }