I'm working on project with a background service consume messages from Rabbitmq's queue.
I have a background service use background task queue like this and here to process task paralleling.
I would like to store in buffer messages consumed and group them by id and send them in another RabbitMq queue after an specified interval or size of buffer.
I already post question here, and the solution will be to implement Channel<T>
.
But a worker can't produce and consume in same thread.
So I thought about to use 2 workers, one to consume messages from RabbitMq and store them in Channel (Write) and another worker that read Channel and group items to send them to another queue.
Is it the best way to achieve this ?
Things are far simpler. And there's no leak, unless you seriously misuse Channels.
A BackgroundService or an IHostedService isn't a worker or Task, it's a class created by the Host when it starts. The host calls StartAsync
on it when the application starts, and StopAsync
at the end. The BackgroundService
base class simplifies things a a bit so you can put all your code in an overridden ExecuteAsync
without having to handle Start and Stop. You can start as many tasks as you want inside StartAsync
or ExecuteAsync
. You don't need to create multiple background/hosted services.
In a hosted service the StartAsync(CancellationToken)
can create your client, connect to RabbitMq, and register an event handler to post to whatever you use to process the messages. .NET offers multiple ways to process streams of events , both :
EventingBasicConsumer? _consumer;
void OnReceived(object sender,BasicDeliverEventArgs ea)
{
var body = ea.Body.ToArray();
//Process that message
}
protected override async Task StartAsync(CancellationToken stoppingToken)
{
_consumer=StartRabbitAndCreateConsumer();
_consumer.Received += OnReceived;
}
protected override async Task StopAsync(CancellationToken stoppingToken)
{
_consumer.Received -= OnReceived;
_consumer.Dispose();
}
public void Dispose()
{
CleanupRabbitMqEtc();
}
There are many ways to construct processing pipelines to process these events
With Rx.NET, grouping by key, buffering by count and timeout could be done with a query similar to this one, borrowed from this question
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
Since you care about the grouping key (the ID) the query could be adjusted to
eventSource
.GroupBy(e => e.ID)
.SelectMany(group => new {
ID=group.Key,
Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
})
.Subscribe(batch => SendToQueue(batch.ID,batch.Items));
A gotcha
with Rx.NET is that it's optimized for event stream processing unlike the other options, so it's single threaded by default! You can specify that the observable, the subscriptions and even individual steps will run in different threads though. .SubscribeOn(Scheduler.ThreadPool)
will run the subscription on threadpool threads
eventSource
.GroupBy(e => e.ID)
.SelectMany(group => new {
ID=group.Key,
Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
})
.SubcribeOn(Scheduler.ThreadPool)
.Subscribe(batch => SendToQueue(batch.ID,batch.Items));
Creating an Observable from an event is already supported but someone asked for explicit Rx.NET support in RabbitMQ and even provided a source. Unfortunately, the request was rejected because some of the commenters wanted Rx.NET operators, others wanted Dataflow operators, so it was decided to not include a specific implementation.
Using the implementation in that issue:
public IObservable<DisposableValue<T>> Receive<T>(string exchangeName, string routingKey)
{
if (exchangeName == null)
{
throw new ArgumentNullException(nameof(exchangeName));
}
if (routingKey == null)
{
throw new ArgumentNullException(nameof(routingKey));
}
var queueName = this.model.QueueDeclare().QueueName;
this.model.QueueBind(queueName, exchangeName, routingKey);
using (var subscription = new Subscription(model, queueName, false))
{
var consumer = new EventingBasicConsumer(this.model);
var observable = Observable
.FromEventPattern<BasicDeliverEventArgs>(
x => consumer.Received += x,
x => consumer.Received -= x)
.Select(x => new DisposableValue<T>(
Deserialize<T>(x.EventArgs.Body),
() => this.model.BasicAck(x.EventArgs.DeliveryTag, false)));
this.model.BasicConsume(queueName, false, consumer);
return observable;
}
}
public class DisposableValue<T> : IDisposable
{
private readonly Action disposeAction;
public DisposableValue(T value, Action disposeAction)
{
this.disposeAction = this.disposeAction ??
throw new ArgumentNullException(nameof(DisposableValue<T>.disposeAction));
this.Value = value;
}
public T Value { get; }
public void Dispose() => this.disposeAction();
}
You could create an Rx.NET subscription in StartAsync
and dispose it in StopAsync
:
protected override async Task StartAsync(CancellationToken stoppingToken)
{
_sub=Receive<Foo>("Exchange", "RoutingKey")
.GroupBy(e => e.ID)
.SelectMany(group => new {
ID=group.Key,
Items=group.Buffer(TimeSpan.FromSeconds(60)), 100)
})
.SubcribeOn(Scheduler.ThreadPool)
.Subscribe(batch => SendToQueue(batch.ID,batch.Items));
}
protected override async Task StopAsync(CancellationToken stoppingToken)
{
_sub.Dispose();
}