Search code examples
c#observablesystem.reactiveidisposable

Dispose previous observable selectmany rx


I'm monitoring a directory with the following setup:

var folder = new Subject();
folder.OnNext("somepath");

folder.SelectMany(FileMonitor)
  .Subscribe(x => Console.WriteLine($"Found: {x}"));

public IObservable<string> FileMonitor(string pathToWatch){
   return Observable.Create<string>(obs => {
     var dfs = CreateAndStartFileWatcher(pathToWatch,obs);
     () => dfs.Dispose();
   });
}

This works, but if I pass a new path to the subject, the previous FileMonitor is not disposed.

Is there a way to cancel/dispose the previously generated Observable?

It looks like I need: http://reactivex.io/documentation/operators/switch.html but this is not implemented in c#?


Solution

  • Sometimes, asking a question gives yourself new insights. The solution is to use switch which is available, but only works on a Observable.

    So it should be:

    var folder = new Subject();
    folder.OnNext("somepath");
    
    folder.Select(FileMonitor)
      .Switch()
      .Subscribe(x => Console.WriteLine($"Found: {x}"));
    
    public IObservable<string> FileMonitor(string pathToWatch){
       return Observable.Create<string>(obs => {
         var dfs = CreateAndStartFileWatcher(pathToWatch,obs);
         () => dfs.Dispose();
       });
    }
    

    Leaving this question for reference instead of removing it.