Search code examples
c#.netmultithreadingnservicebus

NServiceBus events lost when published in separate thread


I've been working on getting long running messages working with NServiceBus on an Azure transport. Based off this document, I thought I could get away with firing off the long process in a separate thread, marking the event handler task as complete and then listening for custom OperationStarted or OperationComplete events. I noticed the OperationComplete event is not received by my handlers most cases. In fact, the only time it is received is when I publish it immediately after the OperationStarted event is published. Any actual processing in between somehow prevents the completion event from being received. Here is my code:

Abstract class used for long running messages

public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
    protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();

    public Task Handle(TMessage message, IMessageHandlerContext context)
    {
        var opStarted = new OperationStarted
        {
            OperationID = Guid.NewGuid(),
            OperationType = typeof(TMessage).FullName
        };
        var errors = new List<string>();
        // Fire off the long running task in a separate thread
        Task.Run(() =>
            {
                try
                {
                    _logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
                    context.Publish(opStarted);
                    ProcessMessage(message, context);
                }
                catch (Exception ex)
                {
                    errors.Add(ex.Message);
                }
                finally
                {
                    var opComplete = new OperationComplete
                    {
                        OperationType = typeof(TMessage).FullName,
                        OperationID = opStarted.OperationID,
                        Errors = errors
                    };

                    context.Publish(opComplete);

                    _logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
                }
            });

        return Task.CompletedTask;
    }

    protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}

Test Implementation

public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
    protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
    {
        // If I remove this, or lessen it to something like 200 milliseconds, the 
        // OperationComplete event gets handled
        Thread.Sleep(1000);
    }
}

Operation Events

public sealed class OperationComplete : IEvent
{
    public Guid OperationID { get; set; }
    public string OperationType { get; set; }
    public bool Success => !Errors?.Any() ?? true;
    public List<string> Errors { get; set; } = new List<string>();
    public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}

public sealed class OperationStarted : IEvent
{
    public Guid OperationID { get; set; }
    public string OperationType { get; set; }
    public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}

Handlers

public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
    static ILog logger = LogManager.GetLogger<OperationHandler>();

    public Task Handle(OperationStarted message, IMessageHandlerContext context)
    {
        return PrintJsonMessage(message);
    }

    public Task Handle(OperationComplete message, IMessageHandlerContext context)
    {
        // This is not hit if ProcessMessage takes too long
        return PrintJsonMessage(message);
    }

    private Task PrintJsonMessage<T>(T message) where T : class
    {
        var msgObj = new
        {
            Message = typeof(T).Name,
            Data = message
        };
        logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
        return Task.CompletedTask;
    }

}

I'm certain that the context.Publish() calls are being hit because the _logger.Info() calls are printing messages to my test console. I've also verified they are hit with breakpoints. In my testing, anything that runs longer than 500 milliseconds prevents the handling of the OperationComplete event.

If anyone can offer suggestions as to why the OperationComplete event is not hitting the handler when any significant amount of time has passed in the ProcessMessage implementation, I'd be extremely grateful to hear them. Thanks!

-- Update -- In case anyone else runs into this and is curious about what I ended up doing:

After an exchange with the developers of NServiceBus, I decided on using a watchdog saga that implemented the IHandleTimeouts interface to periodically check for job completion. I was using saga data, updated when the job was finished, to determine whether to fire off the OperationComplete event in the timeout handler. This presented an other issue: when using In-Memory Persistence, the saga data was not persisted across threads even when it was locked by each thread. To get around this, I created an interface specifically for long running, in-memory data persistence. This interface was injected into the saga as a singleton, and thus used to read/write saga data across threads for long running operations.

I know that In-Memory Persistence is not recommended, but for my needs configuring another type of persistence (like Azure tables) was overkill; I simply want the OperationComplete event to fire under normal circumstances. If a reboot happens during a running job, I don't need to persist the saga data. The job will be cut short anyway and the saga timeout will handle firing the OperationComplete event with an error if the job runs longer than a set maximum time.


Solution

  • The cause of this is that if ProcessMessage is fast enough, you might get the current context before it gets invalidated, such as being disposed.

    By returning from Handle successfully, you're telling NServiceBus: "I'm done with this message", so it may do what it wants with the context as well, such as invalidating it. In the background processor, you need an endpoint instance, not a message context.

    By the time the new task starts running, you don't know if Handle has returned or not, so you should just consider the message has already been consumed and is thus unrecoverable. If errors happen in your separate task, you can't retry them.

    Avoid long running processes without persistence. The sample you mention has a server that stores a work item from a message, and a process that polls this storage for work items. Perhaps not ideal, in case you scale out processors, but it won't lose messages.

    To avoid constant polling, merge the server and the processor, poll inconditionally once when it starts, and in Handle schedule a polling task. Take care for this task to only poll if no other polling task is running, otherwise it may become worse than constant polling. You may use a semaphore to control this.

    To scale out, you must have more servers. You need to measure if the cost of N processors polling is greater than sending to N servers in a round-robin fashion, for some N, to know which approach actually performs better. In practice, polling is good enough for a low N.

    Modifying the sample for multiple processors may require less deployment and configuration effort, you just add or take processors, while adding or removing servers needs changing their enpoints in all places (e.g. config files) that point to them.

    Another approach would be to break the long process into steps. NServiceBus has sagas. It's an approach usually implemented for a know or bounded amount of steps. For an unknown amount of steps, it's still feasible, although some might consider it an abuse of the seemingly intended purpose of sagas.