Search code examples
c#async-awaitsystem.reactivecorecursion

Observable from chained Tasks


I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

And then call .SelectMany(i => i) on the IObservable<IEnumerable<BrokeredMessage>> to turn it into a IObservable<BrokeredMessage>

Where _serviceBusReceiver is an instance of an interface as follows:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx


Solution

  • If you are going to roll your own async Generate function I would recommend the use of recursive scheduling instead of wrapping a while loop.

    public static IObservable<TResult> Generate<TResult>(
        Func<Task<TResult>> initialState,
        Func<TResult, bool> condition,
        Func<TResult, Task<TResult>> iterate,
        Func<TResult, TResult> resultSelector,
        IScheduler scheduler = null) 
    {
      var s = scheduler ?? Scheduler.Default;
    
      return Observable.Create<TResult>(async obs => {
        return s.Schedule(await initialState(), async (state, self) => 
        {
          if (!condition(state))
          {
            obs.OnCompleted();
            return;
          }
    
          obs.OnNext(resultSelector(state));
    
          self(await iterate(state));
    
        });
      });
    }
    

    This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library