I try to figure out how to use DynamicData library. I need to make method that will handle changes of source and pass only last n changes. It could be named Tail(). There is method named Top() in package, it uses IVirtualRequest's. I have wrote example based on this functional:
public static class Extensions
{
public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source,
int numberOfItems)
{
var request = new TailRequest<T>(source, numberOfItems);
return source.Virtualise(Observable.Return(request));
}
}
public class TailRequest<T> : IVirtualRequest, IDisposable
{
private readonly IDisposable subscription;
private int _count;
public int Size { get; }
public int StartIndex => _count > Size ? _count - Size : 0;
public TailRequest(IObservable<IChangeSet<T>> source, int numberOfItems)
{
//how to dispose this from outside???
subscription = source.Subscribe(RefreshStartIndex);
Size = numberOfItems;
}
private void RefreshStartIndex(IChangeSet<T> changeSet)
{
_count += changeSet.Adds;
_count -= changeSet.Removes;
}
public void Dispose()
{
subscription.Dispose();
}
}
But i can't understand how should i dispose this request from outside, if i use this method like:
SourceList<Message> sourceList = new SourceList<Message>();
var subscription = _sourceList.Connect()
.Tail(15)
.ObserveOn(RxApp.MainThreadScheduler)
.Bind(Messages)
.Subscribe();
I think subscription.Dispose() will not dispose my TailRequest. Or is there a better solution?
This will do the trick
public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source, int numberOfItems)
{
return Observable.Create<IChangeSet<T>>(observer =>
{
var request = new TailRequest<T>(source, numberOfItems);
return new CompositeDisposable
(
request,
source.Virtualise(Observable.Return(request)).SubscribeSafe(observer)
);
});
The function inside Observable.Create returns a disposable which is used to clean up any resources created by the observable. When the consuming subscriber is disposed, the inner disposable created in the function is also disposed.