Search code examples
c#.netsystem.reactiverx.net

Rx.NET Buffer emit all items on Cancellation


I am using Rx.NET library and its method Buffer in the most simple manner like:

observable.Buffer(TimeSpan.FromSeconds(5), 10);

It works great except the case when cancellation token is activated. When that occurs I would like Buffer to emit all events which it holds at that moment and not to wait until Timer ticks. Is that possible?

Example: I have items 1, 2, 3 emitted in 2 seconds so limit 10 items is not reached and limit 5 seconds is not reached. Now cancellation is requested and I would like to get all buffered items to at least "see" them before ending request/process without waiting additional 3 seconds for timer.


Solution

  • As I see the possible solution if I understand it correctly:

        public static IObservable<Unit> ToObservable(this CancellationToken ct) =>
            Observable.Create<Unit>(observer => ct.Register(() =>
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                })
            );
    
        static void Main()
        {
            var cts = new CancellationTokenSource(6000);
    
            Observable.Interval(TimeSpan.FromSeconds(0.5))
                .TakeUntil(cts.Token.ToObservable())
                .Buffer(10)
                .Subscribe(b=>
                {
                    // ... processing
                });
            Console.ReadLine();
        }
    

    The main idea is to finish Buffer's source sequence on cancellation request - it makes Buffer immediately release collected items and finish its work, and later on Subscribe you just check if the cancellation was requested and make an appropriate decision upon data arrived.