Search code examples
c#system.reactivedynamic-datareactiveui

Tail method implementation with DynamicData by Roland Pheasant


I try to figure out how to use DynamicData library. I need to make method that will handle changes of source and pass only last n changes. It could be named Tail(). There is method named Top() in package, it uses IVirtualRequest's. I have wrote example based on this functional:

public static class Extensions
    {
        public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source,
                                                         int numberOfItems)
        {    
            var request = new TailRequest<T>(source, numberOfItems);
            return source.Virtualise(Observable.Return(request));
        }
    }

    public class TailRequest<T> : IVirtualRequest, IDisposable
    {
        private readonly IDisposable subscription;
        private int _count;

        public int Size { get; }

        public int StartIndex => _count > Size ? _count - Size : 0;

        public TailRequest(IObservable<IChangeSet<T>> source, int numberOfItems)
        {
            //how to dispose this from outside???
            subscription = source.Subscribe(RefreshStartIndex);
            Size = numberOfItems;
        }

        private void RefreshStartIndex(IChangeSet<T> changeSet)
        {
            _count += changeSet.Adds;
            _count -= changeSet.Removes;
        }

        public void Dispose()
        {
            subscription.Dispose();
        }
    }

But i can't understand how should i dispose this request from outside, if i use this method like:

SourceList<Message> sourceList = new SourceList<Message>();
var subscription = _sourceList.Connect()
            .Tail(15)
            .ObserveOn(RxApp.MainThreadScheduler)
            .Bind(Messages)
            .Subscribe();

I think subscription.Dispose() will not dispose my TailRequest. Or is there a better solution?


Solution

  • This will do the trick

    public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source, int numberOfItems)
    {
        return Observable.Create<IChangeSet<T>>(observer =>
        {
            var request = new TailRequest<T>(source, numberOfItems);
    
            return new CompositeDisposable
            (
             request,
             source.Virtualise(Observable.Return(request)).SubscribeSafe(observer)
            );
    });
    

    The function inside Observable.Create returns a disposable which is used to clean up any resources created by the observable. When the consuming subscriber is disposed, the inner disposable created in the function is also disposed.