Search code examples
c#.netreactive-programmingsystem.reactiverx.net

Observable.Buffer depending on buffered items


I would like to buffer items in a sequence according to a condition. The problem is that this condition depends on the items that are processed.

Let me put an example:

Given this:

new[] { 1, 3, 5, 7, 2, 4, 6, 8, 1 };
  • If n is odd, stop buffering
  • If n is even, start buffering

This way, the result sequence should be:

{ 1 }
{ 3 }
{ 5 }
{ 7 }
{ 2, 4, 6, 8 }
{ 1 }

I've tried variations of this without success:

var boundaries = origin.Select(x => x % 2 != 0).DistinctUntilChanged();
var result = origin.Buffer(boundaries);

Solution

  • This might be close to what you want. Instead of the Buffer operator it uses the GroupByUntil operator, which I consider to be more reliable.

    /// <summary>
    /// Splits the elements of a sequence into chunks that are starting with
    /// elements that satisfy the predicate.
    /// </summary>
    public static IObservable<IList<TSource>> BufferByPredicate<TSource>(
        this IObservable<TSource> source,
        Predicate<TSource> startNewBufferPredicate)
    {
        ArgumentNullException.ThrowIfNull(source);
        ArgumentNullException.ThrowIfNull(startNewBufferPredicate);
        return source
            .SelectMany(x =>
            {
                var subSequence = Observable.Return((Value: x, HasValue: true));
                if (startNewBufferPredicate(x))
                    // Add a fake "boundary" element before the real element.
                    subSequence = subSequence.Prepend((default, false));
                return subSequence;
            })
            .GroupByUntil(_ => 0, g => g.SkipWhile(e => e.HasValue))
            .SelectMany(g => g.Where(e => e.HasValue).Select(e => e.Value).ToArray())
            .Where(w => w.Length > 0);
    }
    

    Usage example:

     IObservable<IList<int>> result = origin.BufferByPredicate(x => x % 2 != 0);