Is there any way to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?
I need to guarantee in-order processing and acking of messages coming from a system that does not publish in EasyNetQ format. I know the consumer runs on a single thread, but the IAdvancedBus
interface only offers one method to consume raw messages:
IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
The Task
return type means that the consumer is running the callback asynchronously and hence may process the messages out of order.
If not, any ideas for changing the code to support this? I would make the interface method:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);
and implement it in RabbitAdvancedBus
, but I am not sure where the code would go exactly.
I received a response that works in the EasyNetQ Google Group:
To execute synchronously you can do this:
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// do your synchronous work.....
return Task.CompletedTask;
});
or add an extension:
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
public static class RabbitAdvancedBusConsumeExtension
{
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
}
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
}
private static Task ExecuteSynchronously(Action action)
{
var tcs = new TaskCompletionSource<object>();
try
{
action();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
return tcs.Task;
}
}
class Program
{
static void Main(string[] args)
{
var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
var queue = bus.Advanced.QueueDeclare();
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// .....
});
}
}
}
UPDATE: This functionality was added in release 0.52.0.410: