Search code examples
c#dependency-injectionrabbitmq.net-7.0easynetq

Set up only synchronous blocking calls in IBus.PubSub.SubscribeAsync()?


The problem is that I need to handle each message sequentially so that one message should block the next message. No message should be handled asynchronously and no message should be handled at the same time as another one. But sometimes (like 0,8% of the cases, the upcoming message is handled asynchronously, even though all the methods are synchronous 'void' and **not 'Tasks' **).

// Steps to reproduce the behavior: Run this class and receive several messages:

public class Subscriber : ISubscriber
{
    private readonly IBus _bus;
    private readonly IMessageProcessor _myMessageProcessor;
    private readonly ILogger<ISubscriber> _logger;
    private const string _subscriptionId = "MySubs";

    public Subscriber(IBus bus, IMessageProcessor myMessageProcessor, ILogger<ISubscriber> logger)
    {
        _bus = bus;
        _myMessageProcessor = myMessageProcessor;
        _logger = logger;
    }

    public void Subscribe()
    {
        _bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => c.WithAutoDelete(false));
        _bus.Advanced.Connected += Advanced_Connected;
    }

    private void Advanced_Connected(object sender, EventArgs e)
    {
        _bus.PubSub.SubscribeAsync<MyCustomMessage>(_subscriptionId, OnMyCustomMessage, c => c.WithAutoDelete(false));
    }

    private void OnMyCustomMessage(MyCustomMessage message)
    {
        try {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"Received message #{message.Id}. {Newtonsoft.Json.JsonConvert.SerializeObject(message)}");
            _myMessageProcessor.ProcessCustomMessage(message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"{nameof(Subscriber)}.{nameof(OnMyCustomMessage)}: args: {Newtonsoft.Json.JsonConvert.SerializeObject(message)}");
        }
    }
}

Expected behavior

The expected behaviour is 100% synchronous handling of each message one by one without async, no async code is expected.

Versioning:

EasyNetQ version: [e.g. 7.3.6] RabbitMQ version [e.g. 25.1.1]

Additional context:

After the update to 7.4.* I downgraded because in 7.3.8 version there was an update for non-blocking calls, but I only do need the blocking calls. Can you guide me on how to use the message blocking handlers in the PubSub?

I tried to use _bus.PubSub.Subscribe instead of _bus.PubSub.SubscribeAsync with no luck.


Solution

  • As the EasyNetQ developers answer this, the solution right now is probably to use the WithPrefetchCount(1) method when declaring or subscribing to the Queue.

    await _bus.PubSub.SubscribeAsync<MyCustomMessage>("_subscriptionId",
         OnMyCustomMessage, c => 
    {
        c.WithAutoDelete(false);
        c.WithPrefetchCount(1);
    });
    
    

    src: https://github.com/EasyNetQ/EasyNetQ/issues/1654