how implement a gRPC/Proto endpoint with repeated in message (i.e nested types in request and response)

Goal: I want to code a microservice exposing an endpoint receiving and responding a message with repeated. I tried apply what I learned from Proto official guide and I coded this proto:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionsProto";
option objc_class_prefix = "HLW";

package com.mybank.endpoint;

import "google/protobuf/wrappers.proto";

service TransactionsService {
  rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);

message TransactionsRequest {
  string transactionDesc = 1;
  repeated Transaction transactions = 2;
message Transaction {
  string id = 1;
  string name = 2;
  string description = 3;

message TransactionsReply {
  string message = 1;

I could gradle build and I got this TransactionsServiceGrpcKt autogenerated

package com.mybank.endpoint

import com.mybank.endpoint.TransactionsServiceGrpc.getServiceDescriptor
import io.grpc.CallOptions
import io.grpc.CallOptions.DEFAULT
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.ServerServiceDefinition
import io.grpc.ServerServiceDefinition.builder
import io.grpc.ServiceDescriptor
import io.grpc.Status.UNIMPLEMENTED
import io.grpc.StatusException
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.AbstractCoroutineStub
import io.grpc.kotlin.ClientCalls.unaryRpc
import io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition
import io.grpc.kotlin.StubFor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmOverloads
import kotlin.jvm.JvmStatic

 * Holder for Kotlin coroutine-based client and server APIs for
 * com.mybank.endpoint.TransactionsService.
object TransactionsServiceGrpcKt {
  val serviceDescriptor: ServiceDescriptor
    get() = TransactionsServiceGrpc.getServiceDescriptor()

  val postTransactionsMethod: MethodDescriptor<TransactionsRequest, TransactionsReply>
    get() = TransactionsServiceGrpc.getPostTransactionsMethod()

   * A stub for issuing RPCs to a(n) com.mybank.endpoint.TransactionsService service as suspending
   * coroutines.
  class TransactionsServiceCoroutineStub @JvmOverloads constructor(
    channel: Channel,
    callOptions: CallOptions = DEFAULT
  ) : AbstractCoroutineStub<TransactionsServiceCoroutineStub>(channel, callOptions) {
    override fun build(channel: Channel, callOptions: CallOptions): TransactionsServiceCoroutineStub
        = TransactionsServiceCoroutineStub(channel, callOptions)

     * Executes this RPC and returns the response message, suspending until the RPC completes
     * with [`Status.OK`][io.grpc.Status].  If the RPC completes with another status, a
     * corresponding
     * [StatusException] is thrown.  If this coroutine is cancelled, the RPC is also cancelled
     * with the corresponding exception as a cause.
     * @param request The request message to send to the server.
     * @return The single response from the server.
    suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = unaryRpc(

   * Skeletal implementation of the com.mybank.endpoint.TransactionsService service based on Kotlin
   * coroutines.
  abstract class TransactionsServiceCoroutineImplBase(
    coroutineContext: CoroutineContext = EmptyCoroutineContext
  ) : AbstractCoroutineServerImpl(coroutineContext) {
     * Returns the response to an RPC for com.mybank.endpoint.TransactionsService.PostTransactions.
     * If this method fails with a [StatusException], the RPC will fail with the corresponding
     * [io.grpc.Status].  If this method fails with a [java.util.concurrent.CancellationException],
     * the RPC will fail
     * with status `Status.CANCELLED`.  If this method fails for any other reason, the RPC will
     * fail with `Status.UNKNOWN` with the exception as a cause.
     * @param request The request from the client.
    open suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = throw
        StatusException(UNIMPLEMENTED.withDescription("Method com.mybank.endpoint.TransactionsService.PostTransactions is unimplemented"))

    final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
      context = this.context,
      descriptor = TransactionsServiceGrpc.getPostTransactionsMethod(),
      implementation = ::postTransactions

So far so good. Now I want to implement it and I am completed stuck.

Here is all three tentatives and its errors

package com.mybank.endpoint

import io.grpc.stub.StreamObserver
import javax.inject.Singleton

class TransactionsEndpoint : TransactionsServiceGrpc.TransactionsServiceImplBase(){

    //First tentative
    //This complains "'postTransactions' overrides nothing" and IntelliJ suggest second next approach
    //override fun postTransactions(request: TransactionsRequest?) : TransactionsReply {

    //Second Tentative
//    override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
//        //it complains Type mismatch... Found: TransactionsReply
//        return TransactionsReply.newBuilder().setMessage("teste").build()
//    }

    //Third Tentative
    //This causes:
    //Return type is 'TransactionsReply', which is not a subtype of overridden public open
    // fun postTransactions(request: TransactionsRequest!, responseObserver: StreamObserver<TransactionsReply!>!):
    // Unit defined in com.mybank.endpoint.TransactionsServiceGrpc.TransactionsServiceImplBase
    //override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) : TransactionsReply {
//        return TransactionsReply.newBuilder().setMessage("teste").build()
//    }

plugins {
    id "org.jetbrains.kotlin.jvm" version "1.3.72"
    id "org.jetbrains.kotlin.kapt" version "1.3.72"
    id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
    id "application"
    id '' version '0.8.13'

version "0.2"
group "account-control"

repositories {

configurations {
    // for dependencies that are needed for development only

dependencies {

//    implementation("io.micronaut.grpc:micronaut-grpc-runtime")




    testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")


test.classpath += configurations.developmentOnly

mainClassName = "account-control.Application"

test {

allOpen {

compileKotlin {
    kotlinOptions {
        jvmTarget = '11' 
        //Will retain parameter names for Java reflection
        javaParameters = true 

compileTestKotlin {
    kotlinOptions {
        jvmTarget = '11' 
        javaParameters = true 

tasks.withType(JavaExec) {
    classpath += configurations.developmentOnly
    jvmArgs('-XX:TieredStopAtLevel=1', '')

sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/grpckt'
            srcDirs 'build/generated/source/proto/main/java'

protobuf {
    protoc { artifact = "${protocVersion}" }
    plugins {
        grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
        grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
            grpckt {}

I have successfully created my first grpc endpoint with very basic request/reply based on String and now I want to move forward by creating a list of a message. As an analogy, let's say I want a DTO/Pojo which contains a list of an entity.

Honestly, I am totally stuck. So, my main question is: how implement a proto service with repeated message?

An useful comment that can give a north is, why I see in autogenerated stub a method to be implemented with "..., responseObserver: StreamObserver?" instead of a simple "TransactionsReply" as I clearly specify in my proto? What relationship between StreamObserver and repeated message?

Here is the whole project in my GitHub develop branch

You will find two protos: one well successful implement with simple request/reply and other failing as explained above.

*** edited after first answer from Louis

after removed suspend

I am quite confused. With a simple proto as

service Account {
  rpc SendDebit (DebitRequest) returns (DebitReply) {}

message DebitRequest {
  string name = 1;

message DebitReply {
  string message = 1;

I can implemented with

override suspend fun sendDebit(request: DebitRequest): DebitReply {
    return DebitReply.newBuilder().setMessage("teste").build()

Nevertheless, with

service TransactionsService {
  rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);

message TransactionsRequest {
  string transactionDesc = 1;
  repeated Transaction transactions = 2;
message Transaction {
  string id = 1;
  string name = 2;
  string description = 3;

message TransactionsReply {
  string message = 1;

I can't override with same type of response (note that the reply is exactly the same)

neither this implementation proposed from IntelliJ

override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
        super.postTransactions(request, responseObserver)
        return TransactionsReply.newBuilder().setMessage("testReply").build()

nor this with similar response approach

override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) :TransactionsReply {
    super.postTransactions(request, responseObserver)
    return TransactionsReply.newBuilder().setMessage("testReply").build()

On top of that, why if the reply is exactly the same, in my first approach I didn't get StreamObserver proposed from IntelliJ?

*** Final solution thanks to Louis' help. I extended wrong abstract class

override suspend fun postTransactions(request: TransactionsRequest): TransactionsReply {
    return TransactionsReply.newBuilder().setMessage("testReply").build()


  • You're looking for

    class TransactionsEndpoint : TransactionsServiceGrpcKt.TransactionsServiceCoroutineImplBase(){
       override suspend fun postTransactions(request: TransactionsRequest) : TransactionsReply {

    ...with the suspend modifier and no ? on the request, extending TransactionsServiceCoroutineImplBase with the Coroutine in there.

    Note that this has nothing to do with a repeated message. Your RPC has a single input proto, which gets sent only once -- which may well be what you want, unless you want a stream of requests, in which case your proto file should look like

    service TransactionsService {
      rpc PostTransactions(stream TransactionsRequest) returns (TransactionsReply);

    ...and the Kotlin generated code will look different in that case.