Search code examples
dartgrpcgrpc-dart

Dart issues with gRPC connections with streaming response api call


While wokring with gprc in dart, if the response type of the very first rpc call is a streaming response the client app fails to connect to the server when stream handler is envoked. I found the issue while building upon the helloworld example from the package.

Is there any way to ensure that the connection is established? Or is there is anything I am doing incorrectly?

I have tried it with await channel.getConnection(); but it makes no difference.

grpc version: 3.0.2

  1. helloworld.proto:
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  rpc SayHelloStream (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
  1. server.dart:
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Dart implementation of the gRPC helloworld.Greeter server.
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';

class GreeterService extends GreeterServiceBase {
  @override
  Stream<HelloReply> sayHelloStream(
      ServiceCall call, HelloRequest request) async* {
    for (var i = 0; i < 10; i++) {
      yield HelloReply()..message = 'Hello, ${request.name}!';
      await Future.delayed(Duration(seconds: 1));
    }
  }

  @override
  Future<HelloReply> sayHello(ServiceCall call, HelloRequest request) async {
    return HelloReply()..message = 'Hello, ${request.name}!';
  }
}

Future<void> main(List<String> args) async {
  final server = Server(
    [GreeterService()],
    const <Interceptor>[],
    CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
  );
  await server.serve(port: 50051);
  print('Server listening on port ${server.port}...');
}
  1. client.dart
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Dart implementation of the gRPC helloworld.Greeter client.
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';

Future<void> main(List<String> args) async {
  final channel = ClientChannel(
    'localhost',
    port: 50051,
    options: ChannelOptions(
      credentials: ChannelCredentials.insecure(),
      codecRegistry:
          CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
    ),
  );
  final stub = GreeterClient(channel);

  final name = args.isNotEmpty ? args[0] : 'world';

  try {
    // works if this call is made first
    // final response = await stub.sayHello(HelloRequest()..name = name);
    // print('Greeter client received: ${response.message}');

    // this has no effect
    // await channel.getConnection();

    final responseStream = stub.sayHelloStream(
      HelloRequest()..name = name,
    );
    
    // This doesn't work standalone.
    responseStream
        .listen((value) => print('Greeter client received: ${value.message}'));

    // Works when using await for
    // await for (var value in responseStream) {
    //   print('Greeter client received: ${value.message}');
    // }
  } catch (e) {
    print('Caught error: $e');
  }
  await channel.shutdown();
}

Expected result: It should have worked correctly and printed Greeter client received: ${value.message}' 10 times at 1 second interval.

Actual result: On running client.dart the following error is recieved.

gRPC Error (code: 14, codeName: UNAVAILABLE, message: Error connecting: Connection shutting down., details: null, rawResponse: null, trailers: {})

On adding the following lines (as shown in comments) there are no issues and the result is printed 1 + 10 times as expected.

// final response = await stub.sayHello(HelloRequest()..name = name);
// print('Greeter client received: ${response.message}');

Solution

  • You should only shutdown the channel when you're done with the stream. In your case you shutdown the channel immediately so there's no way gRPC would keep updating the stream as you've already shutdown the connection.