I would like to buffer items in a sequence according to a condition. The problem is that this condition depends on the items that are processed.
Let me put an example:
Given this:
new[] { 1, 3, 5, 7, 2, 4, 6, 8, 1 };
This way, the result sequence should be:
{ 1 }
{ 3 }
{ 5 }
{ 7 }
{ 2, 4, 6, 8 }
{ 1 }
I've tried variations of this without success:
var boundaries = origin.Select(x => x % 2 != 0).DistinctUntilChanged();
var result = origin.Buffer(boundaries);
This might be close to what you want. Instead of the Buffer
operator it uses the GroupByUntil
operator, which I consider to be more reliable.
/// <summary>
/// Splits the elements of a sequence into chunks that are starting with
/// elements that satisfy the predicate.
/// </summary>
public static IObservable<IList<TSource>> BufferByPredicate<TSource>(
this IObservable<TSource> source,
Predicate<TSource> startNewBufferPredicate)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(startNewBufferPredicate);
return source
.SelectMany(x =>
{
var subSequence = Observable.Return((Value: x, HasValue: true));
if (startNewBufferPredicate(x))
// Add a fake "boundary" element before the real element.
subSequence = subSequence.Prepend((default, false));
return subSequence;
})
.GroupByUntil(_ => 0, g => g.SkipWhile(e => e.HasValue))
.SelectMany(g => g.Where(e => e.HasValue).Select(e => e.Value).ToArray())
.Where(w => w.Length > 0);
}
Usage example:
IObservable<IList<int>> result = origin.BufferByPredicate(x => x % 2 != 0);