Search code examples
c#grpcgrpc-dotnetgrpc-c#

Read server streaming into client c#


I'm doing an application using server streaming.

The problem is the client doesn't read the data from the server streaming.

This is my proto service:

service UserService {
    rpc GetData(Id) returns (stream DataResponse) {}
}
message Id {
    int32 id = 1;
}

message DataResponse {
    bytes data = 1;
}

c# server is like this:

public override async Task GetData(Id request, IServerStreamWriter<DataResponse> response, ServerCallContext context)
{
    var user = {} // get user
    foreach (var d in user.Data)
    {
        await response.WriteAsync(new DataResponse { Data = d });
    }
}

And it works because I have a NodeJS client where I can call the server and works perfectly.

Client in Node is

let call = client.getData({id:1})
call.on('data', function (response) {
    // do things
})
call.on('end', function () {
    // do things
})

And c# client is:

AsyncServerStreamingCall<DataResponse> response = client.GetData(new Id{Id_ = 1});
while(await response.ResponseStream.MoveNext())
{
    Console.WriteLine("Into while loop"); // <-- This is never executed
    DataResponse current = response.ResponseStream.Current;
    Console.WriteLine($"{current.Data}");
}

I've also added a try/catch and it doesn't output anything so it seems MoveNext() is always false.

What is the problem here? Why NodeJS client works and c# client can't read the stream? Have I missed something?

Here is full client.cs class:

class Program
{
    const int Port = 50051;
    static void Main(string[] args)
    {
        try
        {
            Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
            var client = new UserService.UserServiceClient(channel);
            GetDataStreaming(client);
        }
        catch (RpcException ex)
        {
            Console.WriteLine($"Error: {{Code: {ex.StatusCode}, Status: {ex.Status.Detail}}}");
        }
    }

    private static async void GetDataStreaming(UserService.UserServiceClient client)
    {
        
            AsyncServerStreamingCall<DataResponse> response = client.GetData(new Id { Id_ = 1 });
            while (await response.ResponseStream.MoveNext())
            {
                Console.WriteLine("Into while loop");
                DataResponse current = response.ResponseStream.Current;
                Console.WriteLine($"{current.Data.ToStringUtf8()}");
            }

    }

}

Solution

  • The issue is that your client has ended before the client receive the response. When you call GetDataStreaming(client) in Main it doesn't wait and finishes.

    To fix the issue change async void GetDataStreaming to async Task GetDataStreaming.

    private static async Task GetDataStreaming(UserService.UserServiceClient client)
    {        
            AsyncServerStreamingCall<DataResponse> response = client.GetData(new Id { Id_ = 1 });
            while (await response.ResponseStream.MoveNext())
            {
                Console.WriteLine("Into while loop");
                DataResponse current = response.ResponseStream.Current;
                Console.WriteLine($"{current.Data.ToStringUtf8()}");
            }
    }
    

    Change static void Main to static async Task Main and you should also call channel.ShutdownAsync method at the end.

    static async Task Main(string[] args)
    {
        try
        {
            Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
            var client = new UserService.UserServiceClient(channel);
            await GetDataStreaming(client);
            await channel.ShutdownAsync();
        }
        catch (RpcException ex)
        {
            Console.WriteLine($"Error: {{Code: {ex.StatusCode}, Status: {ex.Status.Detail}}}");
        }
    }
    

    Another option is to change async void GetDataStreaming to async Task GetDataStreaming and in Main method wait until Task complete.

    static void Main(string[] args)
    {
        try
        {
            Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
            var client = new UserService.UserServiceClient(channel);
            var task = GetDataStreaming(client);
            task.Wait();
        }
        catch (RpcException ex)
        {
            Console.WriteLine($"Error: {{Code: {ex.StatusCode}, Status: {ex.Status.Detail}}}");
        }
    }