I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>
). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk
LINQ operator, as suggested in this question: Create batches in LINQ.
My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:
static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}
// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
Output:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15]
before getting the exception.
My question is: Is there any way to configure the Chunk
operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive
, with the desirable behavior?
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);
Note: Except from the System.Linq.Chunk
operator I also experimented with the Buffer
operator from the System.Interactive package, as well as the Batch
operator from the MoreLinq package. Apparently they all behave the same (destructively).
Update: Here is the desirable output of the above example:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The difference is the line Consumed: [11, 12, 13, 14, 15]
, that is not present in the actual output.
If you preprocess your source to make it stop when it encounters an exception, then you can use Chunk()
as-is.
public static class Extensions
{
public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
{
using var enumerator = source.GetEnumerator();
while (true)
{
T current;
try
{
if (!enumerator.MoveNext())
{
break;
}
current = enumerator.Current;
}
catch (Exception e)
{
exceptionCallback?.Invoke(e);
break;
}
yield return current;
}
}
}
Exception? e = null;
foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
I feel like that keeps responsibilities separated nicely. If you want a helper that throws an exception instead of having to capture it yourself, you can use this as a component to simplify writing that helper:
public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
{
Exception? e = null;
var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
foreach (var element in result)
{
yield return element;
}
if (e != null)
{
throw new InvalidOperationException("source threw an exception", e);
}
}
Note that this will throw a different exception than the one emitted by the producer. This lets you keep the stack trace associated with the original exception, whereas throw e
would overwrite that stack trace.
You can tweak this according to your needs. If you need to catch a specific type of exception that you're expecting your producer to emit, it's easy enough to use the when
contextual keyword with some pattern matching.
try
{
foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
}
catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
{
Console.WriteLine(e.InnerException.ToString());
}