Hi i am trying to establish bidirectional communication using gRPC. Client to server communication works fine but on the other hand server to client streaming does not work.
Here is my .proto file:
syntax = "proto3";
option csharp_namespace = "GrpcService";
import "google/protobuf/empty.proto";
package Server;
service ApplicationService
{
rpc SendText (TextMessage) returns (google.protobuf.Empty);
rpc SubscribeText (google.protobuf.Empty) returns (stream OnTextChangedEventArgs);
}
message TextMessage
{
string Text = 1;
}
message OnTextChangedEventArgs
{
string Text = 1;
}
Here is ServerApplicationService.cs file:
namespace GrpcService.Services
{
public class ServerApplicationService : ApplicationService.ApplicationServiceBase
{
private readonly ILogger<ServerApplicationService> logger;
private Text text;
public ServerApplicationService(ILogger<ServerApplicationService> logger, Text text)
{
this.logger = logger;
this.text = text;
}
public override Task<Empty> SendText(TextMessage request, ServerCallContext context)
{
this.logger.Log(LogLevel.Information, nameof(SendText));
this.text.Value = request.Text;
return Task.FromResult(new Empty());
}
public override Task SubscribeText(Empty request, IServerStreamWriter<OnTextChangedEventArgs> responseStream, ServerCallContext context)
{
this.text.ValueChanged += (s, e) =>
{
this.logger.Log(LogLevel.Information, nameof(SubscribeText));
responseStream.WriteAsync(new OnTextChangedEventArgs()
{
Text = this.text.Value
});
};
return Task.CompletedTask;
}
}
}
And the last one is Client.cs:
namespace GrpcClient
{
public class TextChangedEventArgs : EventArgs
{
public string Text { get; }
public TextChangedEventArgs(string text)
{
this.Text = text;
}
}
class GrpcCallbacks
{
private bool isRunning;
private ApplicationService.ApplicationServiceClient client;
public event EventHandler<TextChangedEventArgs> TextChanged;
public GrpcCallbacks(ApplicationService.ApplicationServiceClient client)
{
this.client = client;
}
public void RunCallbacks()
{
if (this.isRunning)
{
throw new InvalidOperationException();
}
this.isRunning = true;
Task.Run(this.RunCallbacksInternal);
}
private async Task RunCallbacksInternal()
{
using (AsyncServerStreamingCall<OnTextChangedEventArgs> response = this.client.SubscribeText(new Empty()))
{
while (true)
{
Task timer = Task.Delay(10);
while (await response.ResponseStream.MoveNext())
{
OnTextChangedEventArgs args = response.ResponseStream.Current;
this.TextChanged?.Invoke(this, new TextChangedEventArgs(args.Text));
}
await timer;
}
}
}
}
public class Program
{
public static void Main(string[] args)
{
using var channel = GrpcChannel.ForAddress("https://localhost:7050");
var client = new ApplicationService.ApplicationServiceClient(channel);
var callbacks = new GrpcCallbacks(client);
callbacks.TextChanged += (sender, args) => Console.WriteLine(args.Text);
callbacks.RunCallbacks();
Thread.Sleep(200);
client.SendText(new TextMessage { Text = "a" });
Console.ReadKey();
}
}
}
In the client side in method RunCallbacksInternal
line await response.ResponseStream.MoveNext()
always returns false and response.ResponseStream.Current
is always null so no data is incomming into buffer from server, but the event on the server side ValueChanged
is fired so the responseStream.WriteAsync(new OnTextChangedEventArgs() { Text = this.text.Value });
is called.
What am I doing wrong?
There's two problems that I can see server-side (I haven't looked client-side); the first is that you don't await WriteAsync
, the second is that you return Task.Completed
from SubscribeText
- but that means "I'm done, end of stream". The idea here is that you perform some number of await ... WriteAsync
calls before indicating that you're done, perhaps using Channel<T>
or similar as an async buffer.