Search code examples
orleans

Orleans two way communication between grain client and grain


I would like to implement a callback to a grain client inside a grain method Eg. during evaluation of Grain.Method1 a client method needs to be called to get some data.

I've tried to do so with streams, but when I subscribe the stream on client, the method does not fire.

Grain:

var config = ClusterConfiguration.LocalhostPrimarySilo();
config.AddMemoryStorageProvider();
config.Globals.RegisterStorageProvider<MemoryStorage>("PubSubStore");
config.Globals.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
...
public override async Task OnActivateAsync() {
var streamProvider  = GetStreamProvider("MySMSProvider");
var stream = streamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");

RegisterTimer(s => {
   return stream.OnNextAsync(new MyTypeMessage());
}, null, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(1000));
...

Client:

var clientConfiguration = ClientConfiguration.LocalhostSilo();
clientConfiguration.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
GrainClient.Initialize(clientConfiguration);
...
var streamProvider = GrainClient.GetStreamProvider("MySMSProvider");
var stream = requestStreamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");
await stream.SubscribeAsync(
            async (message, token) => { process message that does not fire }); 

Solution

  • Looks like you are not keeping the reference to StreamSubscriptionHandle<MyTypeMessage> which is returned by subscription

    Try something like this : var subscriptionHandle = await _factCalculationRequestStream .SubscribeAsync(async (message, token) => { process message that does not fire }); and keep the subscriptionHandle from being garbage collected

    In my case I ended up implementing IGrainObserver on my class (which is a websocketBehaviour in a WebAPI project and this subscription works well:

    public class MySocketService: WebSocketBehavior, IGrainObserver, IDisposable
    {
      private StreamSubscriptionHandle<MyStreamEvent>
      ...
    
      //inside some method invoked externally
      _streamSubscriberHandle = await requestedStream
         .SubscribeAsync(onStreamMessage, onStreamError, onStreamComplete);
    }