Search code examples
c#asynchronousreactive-programmingsystem.reactiverx.net

How to call back async function from Rx Subscribe?


I would like to call back an async function within an Rx subscription.

E.g. like that:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

What needs to be done, in order to catch the exception properly?


Solution

  • You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void.

    In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results. In that case, use SelectMany to call the async method for each element, and Replay to cache (plus a Connect to get the ball rolling):

    public class Consumer
    {
        private readonly Service _service = new Service();
    
        public IObservable<string> Trigger()
        {
            var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
                .SelectMany(_ => RunAsync())
                .Replay();
            connectible.Connect();
            return connectible;
        }
    
        public Task<string> RunAsync()
        {
            return _service.DoAsync();
        }
    }
    

    I changed the Results property to be returned from the Trigger method instead, which I think makes more sense, so the test now looks like:

    [Test]
    public async Task Test()
    {
        var sut = new Consumer();
        var results = sut.Trigger();
        var result = await results.FirstAsync();
    }