I'm trying to write a notification system between a server and multiple clients using gRPC server streaming in protobuf-net.grpc (.NET Framework 4.8).
I based my service off of this example. However, if I understand the example correctly, it is only able to handle a single subscriber (as _subscriber
is a member variable of the StockTickerService
class).
My test service looks like this:
private readonly INotificationService _notificationService;
private readonly Channel<Notification> _channel;
public ClientNotificationService(INotificationService notificationService)
{
_notificationService = notificationService;
_notificationService.OnNotification += OnNotification;
_channel = Channel.CreateUnbounded<Notification>();
}
private async void OnNotification(object sender, Notification notification)
{
await _channel.Writer.WriteAsync(notification);
}
public IAsyncEnumerable<Notification> SubscribeAsync(CallContext context = default)
{
return _channel.AsAsyncEnumerable(context.CancellationToken);
}
INotificationService
just has an event OnNotification
, which is fired when calling its Notify
method.
I then realized that System.Threading.Channels implements the Producer/Consumer pattern, but I need the Publisher/Subscriber pattern. When trying it out, indeed only one of the clients gets notified, instead of all of them.
It would also be nice if the server knew when a client disconnects, which seems impossible when returning _channel.AsAsyncEnumerable
.
So how can I modify this in order to
OnNotification
is calledFor 1, you'd need an implementation of a publisher/subscriber API; each call to SubscribeAsync
will always represent a single conversation between gRPC endpoints, so you'll need your own mechanism for broadcasting that to multiple consumers. Maybe RX is worth investigating there
For 2, context.CancellationToken
should be triggered by client-disconnect