Search code examples
c#.net.net-4.0timeoutrabbitmq

How to write a timeout for short and repeatedly action?


I would like to write a timeout function for the BasicPublish method of the RabbitMQ C# client. For many reasons sometimes the queue is blocked, or rabbit is down or whatever. But I want to detect when the publish is failing right away. I do not want to block the site for any reason.

I'm worried to implement a timeout with Task or threads adding overhead for a simple publish, that we are doing it millions of times in production.

Does anyone have and idea how to write a quick timeout on a fast blocking method as BasicPublish?

Clarification: Also I'm working in .Net 4, I do not have async.


Solution

  • Polly has a TimeoutPolicy aimed at exactly this scenario.

    Polly's TimeoutStrategy.Optimistic is close to @ThiagoCustodio's answer, but it also disposes the CancellationTokenSource correctly. RabbitMQ's C# client doesn't however (at time of writing) offer a BasicPublish() overload taking CancellationToken, so this approach is not relevant.

    Polly's TimeoutStrategy.Pessimistic is aimed at scenarios such as BasicPublish(), where you want to impose a timeout on a delegate which doesn't have CancellationToken support.

    Polly's TimeoutStrategy.Pessimistic:

    [1] allows the calling thread to time-out on (walk away from waiting for) the execution, even when the executed delegate doesn't support cancellation.

    [2] does so at the cost of an extra task/thread (in synchronous executions), and manages this for you.

    [3] also captures the timed-out Task (the task you have walked away from). This can be valuable for logging, and is essential to avoid UnobservedTaskExceptions - particularly in .NET4.0, where an UnobservedTaskException can bring down your entire process.

    Simple example:

    Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic).Execute(() => BasicPublish(...));
    

    Full example properly avoiding UnobservedTaskExceptions:

    Policy timeoutPolicy = Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic, (context, timespan, task) => 
    {
        task.ContinueWith(t => { // ContinueWith important!: the abandoned task may very well still be executing, when the caller times out on waiting for it! 
            if (t.IsFaulted) 
            {
                logger.Error($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds, eventually terminated with: {t.Exception}.");
            }
            else
            {
               // extra logic (if desired) for tasks which complete, despite the caller having 'walked away' earlier due to timeout.
            }
        });
    });
    
    timeoutPolicy.Execute(() => BasicPublish(...));
    

    To avoid building up too many concurrent pending tasks/threads in the case where RabbitMQ becomes unavailable, you can use a Bulkhead Isolation policy to limit parallelization and/or a CircuitBreaker to prevent putting calls through for a period, once you detect a certain level of failures. These can be combined with the TimeoutPolicy using PolicyWrap.