Search code examples
c#observablereactive-programmingsystem.reactiveobservers

Reactive Extensions in .Net (C#) - Subject<T> instance only processing one subscription


so I'm hoping my title is accurate enough, but I'm very much in a 'can't see the wood for the trees' state right now.

I've been investigating reactive extensions for C# for a little while and up until now it has behaved intuitively and wonderfully, then I encountered an issue:

I create a Subject I create a single value Observable.Create and call subscribe for the Subject The value then happily pops into my OnNext() lambda, hooray!

However, if I create a subsequent Observable.Return and subscribe... Nothing happens, and the Subject contains HasObservers: false

I have thrown together a minimal console snippet that demonstrates the problem

    using System.Reactive.Linq;
    using System.Reactive.Subjects;

    static void Main(string[] args)
    {
        Subject<string> messenger = new Subject<string>();
        messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });

        var pathObservable = Observable.Return<string>("File 1");
        pathObservable.Subscribe(messenger);

        var pathObservable2 = Observable.Return<string>("File 2");
        pathObservable2.Subscribe(messenger);

        Console.ReadKey();
    }

Result:

Subject subscription example

Note: Before anyone asks why I'm doing something silly like using two observables where one sequence would do, this is merely an example; in reality the two separate observable creation calls are independent and instigated by external requests for a file, in which case the trouble is that the first request succeeds, the second does not.

I have tried using Observable.Create() instead, and calling OnNext with the single value, with the same result; interestingly having a breakpoint in the lambda in this case shows that the OnNext is being called for the second observable, it's just not popping out into the Subject

Any ideas on what I've missed or what I'm misunderstanding very welcome, thanks in advance


Solution

  • The simplest way to do this is to only subscribe to OnNext messages, as Paulo's suggests. You can also remove or suppress OnCompleted messages from the observable.

    However, the best way to do this is to probably replace the messenger Subject. I don't know your larger use case, but a good goal is to minimize Subject use. They're great for learning, and terrible for applications.

    In this case, you can replace the whole code with a one-liner:

    Observable.Merge(
            Observable.Return<string>("File 1"),
            Observable.Return<string>("File 2")
        )
        .Where(o => o.Length > 0)
        .Subscribe(file => Console.WriteLine("got file request: " + file));
    

    If you wanted to simulate dynamically adding observables, you would probably want to do that with a Subject<IObservable<string>>:

    Subject<IObservable<string>> consoleWriter = new Subject<IObservable<string>>();
    consoleWriter
        .Merge()
        .Where(o => o.Length > 0)
        .Subscribe(file => Console.WriteLine("got file request: " + file));
    
    var pathObservable = Observable.Return<string>("File 1");
    consoleWriter.OnNext(pathObservable);
    
    var pathObservable2 = Observable.Return<string>("File 2");
    consoleWriter.OnNext(pathObservable2);