Search code examples
c#.nettask-parallel-librarytpl-dataflow

ActionBlock appears to complete before I want it to


I have an ActionBlock as part of my class:

private readonly ActionBlock<QueueMessage> block;

In the constructor, I initialize it like so:

block = new ActionBlock<QueueMessage>(async s => await Process(s),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });

My Process method is implemented as:

private async Task Process(QueueMessage message)
{
    using var tx = stateManager.CreateTransaction();

    var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
    if (!result.HasValue)
    {
        logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
        return;
    }

    try
    {
        await processorService.Process(message);
        logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
        await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
        await tx.CommitAsync();
    }
    catch (Exception e)
    {
        logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue.  Error: {e}");

        message.RetryCount++;
        await retryQueue.EnqueueAsync(tx, message);
    }
    finally
    {
        await tx.CommitAsync();
    }
}

When I get a message I want to process, I call:

// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
    logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}

This works the first time, and the message is processed perfectly. The second message I get (maybe a few seconds later, maybe a minute later, depends on the load of the system), then block.SendAsync immediately returns false. No code in Process is ran. When I look at this under the debugger, I notice block.IsCompleted is true. However, I never called block.Complete(); or anything.

Why is my block complete when I still might have more work for it to do? Thanks!

Update: I might have a clue.

I seem to be hitting this breakpoint (In the ActionBlock.cs source code)

enter image description here


Solution

  • Mystery solved. My Action was throwing an unhandled exception, which apparently marks the block as complete and doesn't allow any new messages.

    I'd be interested if there's any way to control this behavior, or if it's recommended to always have try/catch blocks around your action.