Search code examples
c#system.reactiverx.net

Rx.Net : Calling multiple IObservable in SelectMany


Please Note: This is continuation of the question posted earlier but the solution of interest is of a different situation.

I am trying to make multiple calls to the methods that each return IObservable but the values being returned back in the SelectMany statement is a Task and hence the following Subscribe statement does not compile.

This is the code snippet

 var myWorkList = new List<MyWork>
                {
                    new MyWork(),// MyWork.Execute(data) returns IObservable
                    new MyWork()
                }.ToObservable();

 var results =
   myService
    .GetData(accountId)
    .SelectMany(data => myWorkList.ForEachAsync(r => r.Execute(data))
    .Subscribe(result =>
    {
        Console.WriteLine($"Result Id: {result.Id}");
        Console.WriteLine($"Result Status: {result.Pass}");
    });

Solution

  • You just want to use .SelectMany. Try this:

    var myWorkList = new List<MyWork>()
    {
        new MyWork(),
        new MyWork()
    }.ToObservable();
    
    var query =
        from data in myService.GetData(accountId)
        from myWork in myWorkList
        from result in myWork.Execute(data)
        select result;
    
    var results =
        query
            .Subscribe(result =>
            {
                Console.WriteLine($"Result Id: {result.Id}");
                Console.WriteLine($"Result Status: {result.Pass}");
            });
    

    Here's my testing code:

    public static class myService
    {
        public static IObservable<MyData> GetData(int x)
            => Observable.Return(new MyData());
    }
    
    public class MyWork
    {
        public virtual IObservable<MyResult> Execute(MyData data)
        {
            return
                from isMatch in IsMatch(data)
                where isMatch
                select new MyResult() { Id = 1, Pass = true };
        }
    
        public IObservable<bool> IsMatch(MyData data)
        {
            return Observable.Return(true);
        }
    }
    
    public class MyResult
    {
        public int Id;
        public bool Pass;
    }
    
    public class MyData { }
    

    When I execute I get this:

    Result Id: 1
    Result Status: True
    Result Id: 1
    Result Status: True
    

    In the comments on your previous question I suggested doing this as a list of delegates. Here's how:

    var myWorkList = new Func<MyData, IObservable<MyResult>>[]
    {
        md => new MyWork().Execute(md),
        md => new MyWork().Execute(md),
    }.ToObservable();
    
    var query =
        from data in myService.GetData(accountId)
        from myWork in myWorkList
        from result in myWork(data)
        select result;
    

    You get the same result.