Search code examples
c#system.reactivedynamic-datareactiveuireactive

Create derrived collection of ViewModels with DynamicData which updates existing item instead of creating a new one on source item change


For example, I have an observable of some collections which indicates statuses of objects (I get it periodically through REST API).

class User
{
    int Id { get; }
    string Name { get; }
    string Status { get; }
}

IObservable<User> source;

I want to create a DynamicCache object and update it each time the source gives me a new result. So I wrote:

var models = new SourceCache<User,int>(user => user.Id);
models.Connect()
      .Transform(u => new UserViewModel() {...})
      ...
      .Bind(out viewModels)
      .Subscribe();

source.Subscribe(ul => models.EditDiff(ul, (a, b) => a.Status == b.Status));

But now every time a user changes its status, .Transform(...) method creates a new instance of UserViewModel, which isn't the desired behaviour.

Can I somehow determine a rule of updating existing ViewModel's properties (in the derived collection) when source item with the same Id is changing instead of creating a new one every time?


Solution

  • The answer is you need to create a custom operator. I have posted a gist here TransformWithInlineUpdate which you can copy into your solution. Example usage is:

    var users = new SourceCache<User, int>(user => user.Id);
    
    var transformed =  users.Connect()
        .TransformWithInlineUpdate(u => new UserViewModel(u), (previousViewModel, updatedUser) =>
            {
                previousViewModel.User = updatedUser;
            });
    

    For completeness of the answer, here is the code:

        public static IObservable<IChangeSet<TDestination, TKey>> TransformWithInlineUpdate<TObject, TKey, TDestination>(this IObservable<IChangeSet<TObject, TKey>> source,
            Func<TObject, TDestination> transformFactory,
            Action<TDestination, TObject> updateAction = null)
        {
            return source.Scan((ChangeAwareCache<TDestination, TKey>)null, (cache, changes) =>
                {
                    //The change aware cache captures a history of all changes so downstream operators can replay the changes
                    if (cache == null)
                        cache = new ChangeAwareCache<TDestination, TKey>(changes.Count);
    
                    foreach (var change in changes)
                    {
                        switch (change.Reason)
                        {
                            case ChangeReason.Add:
                                cache.AddOrUpdate(transformFactory(change.Current), change.Key);
                                break;
                            case ChangeReason.Update:
                            {
                                if (updateAction == null) continue;
    
                                var previous = cache.Lookup(change.Key)
                                    .ValueOrThrow(()=> new MissingKeyException($"{change.Key} is not found."));
                                //callback when an update has been received
                                updateAction(previous, change.Current);
    
                                //send a refresh as this will force downstream operators to filter, sort, group etc
                                cache.Refresh(change.Key);
                            }
                                break;
                            case ChangeReason.Remove:
                                cache.Remove(change.Key);
                                break;
                            case ChangeReason.Refresh:
                                cache.Refresh(change.Key);
                                break;
                            case ChangeReason.Moved:
                                //Do nothing !
                                break;
                        }
                    }
                    return cache;
                    
                }).Select(cache => cache.CaptureChanges()); //invoke capture changes to return the changeset
        }
    

    Update: this has been included in the library.