Search code examples
c#system.reactivedynamic-datarx.net

How to change observables at runtime in C#?


Consider having two SourceCaches with a different key:

var sourceCacheA = new SourceCache<MyType, int>(x => x.Prop1);
var sourceCacheB = new SourceCache<MyType, string>(x => x.Prop2);

where are both connected:

var observableA = sourceCacheA.Connect();
var observableB = sourceCacheB.Connect();

Let's say the observableA is bound to a ReadOnlyObservableCollection as follows:

observableA.ObserveOn(RxApp.MainThreadScheduler).Bind(out _targetCollection).Subscribe();

How to build an observable that can be changed during runtime, while bound to the same _targetCollection.

So basically, it shall operate like this:

if(somethingHappenBool)
{
    **observableA**.ObserveOn(RxApp.MainThreadScheduler).Bind(out _targetCollection).Subscribe();
} 
else
{
    **observableB**.ObserveOn(RxApp.MainThreadScheduler).Bind(out _targetCollection).Subscribe();
}

EDIT:

Based on Jason's answer I came up with the following solution:

public enum SwitchDataSourceOption
{
    SourceA,
    SourceB
}
public SwitchDataSourceOption Option
{
    get => _option;
    set
    {
        _option = value;
        NotifyPropertyChanged(nameof(Option));
    }
}

sourceCacheA = new SourceCache<MyType, int>(x => x.AProp);
sourceCacheB = new SourceCache<MyType, int>(x => x.BProp);

this.WhenAnyValue(x => x.Option)
    .Select(opt => opt == SwitchDataSourceOption.SourceA ? sourceCacheA : sourceCacheB)
    .Switch()
    .AsObservableCache()
    .Connect()
    .ObserveOn(RxApp.MainThreadScheduler)
    .Bind(out _targetCollection)
    .Subscribe();

However, I don't know how to handle various key types, as in my original example I have int and string as key


Solution

  • It looks like you can make the collection yourself:

    var collection = new ObservableCollectionExtended<MyType>();
    

    And bind to it with

    observableA.Bind(collection);
    

    So you don't have to create a new collection with the bind.


    Generally when I want to change observables at runtime, I think of the event that would change the observable as an observable itself.

    I am not too familiar with ReactiveUI, but I suspect the behavior you want could be solved with the following code:

    var sourceCacheA = new SourceCache<MyType, int>(x => x.Prop1);
    var sourceCacheB = new SourceCache<MyType, string>(x => x.Prop2);
    
    var observableA = sourceCacheA.Connect();
    var observableB = sourceCacheB.Connect();
    
    // Some change stream
    var somethingHappened = new Subject<bool>(); 
    
    // Create observable collection manually
    var collection = new ObservableCollectionExtended<MyType>();
    
    // Project to Unit so that types are the same
    var bindToA = observableA.Bind(collection).Select(_ => Unit.Default);
    var bindToB = observableB.Bind(collection).Select(_ => Unit.Default);
    
    // Create observable that unsubscribes from the current and subscribes to the other when something happens
    var mergedObservable = somethingHappened
        .StartWith(false)
        .Select(s => s ? bindToA : bindToB)
        .Switch();
    
    // Begin monitoring changes
    mergedObservable.Subscribe();
    
    somethingHappened.OnNext(true);
    
    sourceCacheA.Edit(i => i.AddOrUpdate(new [] {new MyType { Prop1 = 1}, new MyType { Prop1 = 2}}));
    // Collection contains 2 elements
    
    sourceCacheA.Edit(i => i.Clear()); 
    // Collection contains 0 elements
    
    somethingHappened.OnNext(false);
    sourceCacheB.Edit(i => i.AddOrUpdate(new [] {new MyType { Prop1 = 0, Prop2 = "1"}, new MyType { Prop1 = 0, Prop2 = "2"}}));
    // Collection contains 2 elements
    
    // This is ignored as the observer is currently only listening to sourceCacheB
    sourceCacheA.Edit(i => i.AddOrUpdate(new [] {new MyType { Prop1 = 1}, new MyType { Prop1 = 2}}));
    // Collection contains previous 2 elements
    

    Note: Be aware that switch can potentially miss events between unsubscribing and subscribing to the new observable.