Search code examples
c#linqexceptionproducer-consumermorelinq

How to chunkify an IEnumerable<T>, without losing/discarding items in case of failure?


I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk LINQ operator, as suggested in this question: Create batches in LINQ.

My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:

static IEnumerable<int> Produce()
{
    int i = 0;
    while (true)
    {
        i++;
        Console.WriteLine($"Producing #{i}");
        yield return i;
        if (i == 15) throw new Exception("Oops!");
    }
}

// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
    Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

Output:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

Online demo.

The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15] before getting the exception.

My question is: Is there any way to configure the Chunk operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive, with the desirable behavior?

public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size);

Note: Except from the System.Linq.Chunk operator I also experimented with the Buffer operator from the System.Interactive package, as well as the Batch operator from the MoreLinq package. Apparently they all behave the same (destructively).


Update: Here is the desirable output of the above example:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

The difference is the line Consumed: [11, 12, 13, 14, 15], that is not present in the actual output.


Solution

  • If you preprocess your source to make it stop when it encounters an exception, then you can use Chunk() as-is.

    public static class Extensions
    {
        public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
        {
            using var enumerator = source.GetEnumerator();
            while (true)
            {
                T current;
                try
                {
                    if (!enumerator.MoveNext())
                    {
                        break;
                    }
                    current = enumerator.Current;
                }
                catch (Exception e)
                {
                    exceptionCallback?.Invoke(e);
                    break;
                }
                yield return current;
            }
        }
    }
    
        Exception? e = null;
        foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
        {
            Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
        }
    

    I feel like that keeps responsibilities separated nicely. If you want a helper that throws an exception instead of having to capture it yourself, you can use this as a component to simplify writing that helper:

        public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
        {
            Exception? e = null;
            var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
            foreach (var element in result)
            {
                yield return element;
            }
            if (e != null)
            {
                throw new InvalidOperationException("source threw an exception", e);
            }
        }
    

    Note that this will throw a different exception than the one emitted by the producer. This lets you keep the stack trace associated with the original exception, whereas throw e would overwrite that stack trace.

    You can tweak this according to your needs. If you need to catch a specific type of exception that you're expecting your producer to emit, it's easy enough to use the when contextual keyword with some pattern matching.

        try
        {
            foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
            {
                Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
            }
        }
        catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
        {
            Console.WriteLine(e.InnerException.ToString());
        }