Search code examples
c#linqlazy-evaluationrx.netsystem.interactive

Is there an example of Ix.NET (System.Interactive) somewhere?


I have an async method, say:

public async Task<T> GetAsync()
{

}

and would be called from:

public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

The above syntax is not valid but basically I am after asynchronous generators. I know it can be handled via Observables. I did experiment with Rx.NET and it worked to some extent. But I am trying to avoid the complexity it brings to codebase, and more importantly the above requirement is still essentially not a reactive system (ours is still pull based). For e.g. I would only listen to the incoming async streams for a certain time and I have to stop the producer (not just unsubscribe the consumer) from the consumer side.

I can invert the method signature like this:

public IEnumerable<Task<T>> GetAllAsync()

But this makes doing LINQ operations bit tricky without blocking. I want it to be non-blocking as well as without loading the entire thing into memory. This library: AsyncEnumerable does exactly what I am looking for but how can the same be done with Ix.NET? They are meant for the same thing I believe.

In other words, how can I make use of Ix.NET to generate an IAsyncEnumerable when dealing with await? Like,

public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}

Solution

  • (Edited)

    Using System.Linq.Async 4.0.0 from NuGet, now you can use SelectAwait.

    class Program
    {
        static void Main(string[] args)
        {
            Task.Run(async () =>
                await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
    
            Thread.Sleep(4000);
        }
    
        static IAsyncEnumerable<string> GetAllAsync()
        {
            var something = new[] { 1, 2, 3 };
    
            return something
                .ToAsyncEnumerable()
                .SelectAwait(async (x) => await GetAsync(x));
        }
    
        static async Task<string> GetAsync(int item)
        {
            await Task.Delay(1000); // heavy
            return "got " + item;
        }
    }
    

    (Obsolete)

    Using System.Interactive.Async 3.2.0 from NuGet, how about this? Currently Select() does not support async lambda, you have to implement it by yourself.

    Better support for async - Task based overloads for AsyncEnumerable

    class Program
    {
        static void Main(string[] args)
        {
            Task.Run(async () =>
                await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
    
            Thread.Sleep(4000);
        }
    
        static IAsyncEnumerable<string> GetAllAsync()
        {
            var something = new[] { 1, 2, 3 };
    
            return something.SelectAsync(async (x) => await GetAsync(x));
        }
    
        static async Task<string> GetAsync(int item)
        {
            await Task.Delay(1000); // heavy
            return "got " + item;
        }
    }
    
    static class AsyncEnumerableExtensions
    {
        public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
        {
            return AsyncEnumerable.CreateEnumerable(() =>
            {
                var enumerator = enumerable.GetEnumerator();
                var current = default(TResult);
                return AsyncEnumerable.CreateEnumerator(async c =>
                    {
                        var moveNext = enumerator.MoveNext();
                        current = moveNext
                            ? await selector(enumerator.Current).ConfigureAwait(false)
                            : default(TResult);
                        return moveNext;
                    },
                    () => current,
                    () => enumerator.Dispose());
            });
        }
    }
    

    The extension method is quoted from this sample. https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972