I have an ActionBlock
as part of my class:
private readonly ActionBlock<QueueMessage> block;
In the constructor, I initialize it like so:
block = new ActionBlock<QueueMessage>(async s => await Process(s),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });
My Process
method is implemented as:
private async Task Process(QueueMessage message)
{
using var tx = stateManager.CreateTransaction();
var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
if (!result.HasValue)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
return;
}
try
{
await processorService.Process(message);
logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
await tx.CommitAsync();
}
catch (Exception e)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue. Error: {e}");
message.RetryCount++;
await retryQueue.EnqueueAsync(tx, message);
}
finally
{
await tx.CommitAsync();
}
}
When I get a message I want to process, I call:
// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}
This works the first time, and the message is processed perfectly. The second message I get (maybe a few seconds later, maybe a minute later, depends on the load of the system), then block.SendAsync
immediately returns false. No code in Process
is ran. When I look at this under the debugger, I notice block.IsCompleted
is true. However, I never called block.Complete();
or anything.
Why is my block complete when I still might have more work for it to do? Thanks!
Update: I might have a clue.
I seem to be hitting this breakpoint (In the ActionBlock.cs source code)
Mystery solved. My Action was throwing an unhandled exception, which apparently marks the block as complete and doesn't allow any new messages.
I'd be interested if there's any way to control this behavior, or if it's recommended to always have try/catch
blocks around your action.