Search code examples
c#task-parallel-library.net-4.5system.reactiverx.net

Unwrapping IObservable<Task<T>> into IObservable<T> with order preservation


Is there a way to unwrap the IObservable<Task<T>> into IObservable<T> keeping the same order of events, like this?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->

Let's say I have a desktop application that consumes a stream of messages, some of which require heavy post-processing:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks

I imagine two ways of dealing with that.

First, I can subscribe to streamOfTasks using the asynchronous event handler:

streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});

Second, I can convert streamOfTasks using Observable.Create, like this:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    })
    select value;

streamOfResults.Subscribe(result => Display(result));

Either way, the order of messages is not preserved: some later messages that don't need any post-processing come out faster than earlier messages that require post-processing. Both my solutions handle the incoming messages in parallel, but I'd like them to be processed sequentially, one by one.

I can write a simple task queue to process just one task at a time, but perhaps it's an overkill. Seems to me that I'm missing something obvious.


UPD. I wrote a sample console program to demonstrate my approaches. All solutions by far don't preserve the original order of events. Here is the output of the program:

Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0

Here is the complete source code:

// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll

using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        Console.WriteLine("Press ENTER to exit.");

        // the source stream
        var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));

        // solution #1: using async event handler
        timerEvents.Subscribe(async x =>
        {
            var result = await PostprocessAsync(x);
            Console.WriteLine($"Async handler: {x}");
        });

        // solution #2: using Observable.Create
        var processedEventsV2 =
            from task in timerEvents.Select(async x => await PostprocessAsync(x))
            from value in Observable.Create<long>(async (obs, cancel) =>
            {
                var v = await task;
                obs.OnNext(v);
            })
            select value;
        processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));

        // solution #3: using FromAsync, as answered by @Enigmativity
        var processedEventsV3 =
            from msg in timerEvents
            from result in Observable.FromAsync(() => PostprocessAsync(msg))
            select result;

        processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));

        Console.ReadLine();
    }

    static async Task<long> PostprocessAsync(long x)
    {
        // some messages require long post-processing
        if (x % 3 == 0)
        {
            await Task.Delay(TimeSpan.FromSeconds(2.5));
        }

        // and some don't
        return x;
    }
}

Solution

  • Combining @Enigmativity's simple approach with @VMAtm's idea of attaching the counter and some code snippets from this SO question, I came up with this solution:

    // usage
    var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));
    
    processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));
    
    // my sample console program prints the events ordered properly:
    Timer: 0
    Timer: 1
    Timer: 2
    Processed: 0
    Processed: 1
    Processed: 2
    Timer: 3
    Timer: 4
    Timer: 5
    Processed: 3
    Processed: 4
    Processed: 5
    ....
    

    Here is my SelectAsync extension method to transform IObservable<Task<TSource>> into IObservable<TResult> keeping the original order of events:

    public static IObservable<TResult> SelectAsync<TSource, TResult>(
        this IObservable<TSource> src,
        Func<TSource, Task<TResult>> selectorAsync)
    {
        // using local variable for counter is easier than src.Scan(...)
        var counter = 0;
        var streamOfTasks =
            from source in src
            from result in Observable.FromAsync(async () => new
            {
                Index = Interlocked.Increment(ref counter) - 1,
                Result = await selectorAsync(source)
            })
            select result;
    
        // buffer the results coming out of order
        return Observable.Create<TResult>(observer =>
        {
            var index = 0;
            var buffer = new Dictionary<int, TResult>();
    
            return streamOfTasks.Subscribe(item =>
            {
                buffer.Add(item.Index, item.Result);
    
                TResult result;
                while (buffer.TryGetValue(index, out result))
                {
                    buffer.Remove(index);
                    observer.OnNext(result);
                    index++;
                }
            });
        });
    }
    

    I'm not particularly satisfied with my solution as it looks too complex to me, but at least it doesn't require any external dependencies. I'm using here a simple Dictionary to buffer and reorder task results because the subscriber need not to be thread-safe (the subscriptions are neved called concurrently).

    Any comments or suggestions are welcome. I'm still hoping to find the native RX way of doing this without custom buffering extension method.