Search code examples
c#.netpublish-subscribeprotobuf-netprotobuf-net.grpc

Publish/Subcribe Notification Service Over gRPC Using protobuf-net.grpc


I'm trying to write a notification system between a server and multiple clients using gRPC server streaming in protobuf-net.grpc (.NET Framework 4.8).

I based my service off of this example. However, if I understand the example correctly, it is only able to handle a single subscriber (as _subscriber is a member variable of the StockTickerService class).

My test service looks like this:

private readonly INotificationService _notificationService;
private readonly Channel<Notification> _channel;

public ClientNotificationService(INotificationService notificationService)
{
    _notificationService = notificationService;
    _notificationService.OnNotification += OnNotification;
    _channel = Channel.CreateUnbounded<Notification>();
}

private async void OnNotification(object sender, Notification notification)
{
    await _channel.Writer.WriteAsync(notification);
}

public IAsyncEnumerable<Notification> SubscribeAsync(CallContext context = default)
{
    return _channel.AsAsyncEnumerable(context.CancellationToken);
}

INotificationService just has an event OnNotification, which is fired when calling its Notify method.

I then realized that System.Threading.Channels implements the Producer/Consumer pattern, but I need the Publisher/Subscriber pattern. When trying it out, indeed only one of the clients gets notified, instead of all of them. It would also be nice if the server knew when a client disconnects, which seems impossible when returning _channel.AsAsyncEnumerable.

So how can I modify this in order to

  • serve multiple clients, with all of them being notified when OnNotification is called
  • and log when a client disconnects?

Solution

  • For 1, you'd need an implementation of a publisher/subscriber API; each call to SubscribeAsync will always represent a single conversation between gRPC endpoints, so you'll need your own mechanism for broadcasting that to multiple consumers. Maybe RX is worth investigating there

    For 2, context.CancellationToken should be triggered by client-disconnect