Search code examples
scalaakkagrpcakka-streamakka-grpc

Akka Streams for server streaming (gRPC, Scala)


I am new to Akka Streams and gRPC, I am trying to build an endpoint where client sends a single request and the server sends multiple responses.

This is my protobuf

syntax = "proto3";

option java_multiple_files = true;
option java_package = "customer.service.proto";

service CustomerService {

  rpc CreateCustomer(CustomerRequest) returns (stream CustomerResponse) {}

}

message CustomerRequest {
  string customerId = 1;
  string customerName = 2;
}

message CustomerResponse {
  enum Status {
    No_Customer = 0;
    Creating_Customer = 1;
    Customer_Created = 2;
  }

  string customerId = 1;
  Status status = 2;
}

I am trying to achieve this by sending customer request then the server will first check and respond No_Customer then it will send Creating_Customer and finally server will say Customer_Created.

I have no idea where to start for it implementation, looked for hours but still clueless, I will be very thankful if anyone can point me in the right direction.


Solution

  • The place to start is the Akka gRPC documentation and, in particular, the service WalkThrough. It is pretty straightforward to get the samples working in a clean project.

    The relevant server sample method is this:

    override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
      println(s"sayHello to ${in.name} with stream of chars...")
      Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
    }
    

    The problem is now to create a Source that returns the right results, but that depends on how you are planning to implement the server so it is difficult to answer. Check the Akka Streams documentation for various options.

    The client code is simpler, just call runForeach on the Source that gets returned by CreateCustomer as in the sample:

    def runStreamingReplyExample(): Unit = {
      val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
      val done: Future[Done] =
        responseStream.runForeach(reply => println(s"got streaming reply: ${reply.message}"))
    
      done.onComplete {
        case Success(_) =>
          println("streamingReply done")
        case Failure(e) =>
          println(s"Error streamingReply: $e")
      }
    }