Search code examples
scalaakkaprotocol-buffersscalapb

How to I convert a HttpEntity into a Protobuf object in Scala using Akka-http?


I'm using Scala with akka-http. I have a protobuf marshaller that works just fine on incoming routes:

trait ProtobufMarshalling[T <: GeneratedMessage, E <: GeneratedMessage] {
  implicit def protobufMarshaller: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def protobufUnmarshaller(implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
}

I can then use this in my Routes object using with ProtobufMarshalling and:

post {
  entity(as[PROTO_IN]) { proto =>
    complete(PROTO_OUT())
  }
}

The server also acts as a client, with communications between itself and another instance. I have a section of code that sends a HttpRequest to a registered instance:

val request2 = HttpRequest(
        method = HttpMethods.POST,
        uri = s"http://${server.getServerIP}/api/registration",
        entity = HttpEntity(ContentTypes.`application/octet-stream`, ApiCall().withRegistrationRequest(RegistrationRequest(SERVER_ID, "127.0.0.1")).toByteArray)
      )
        .withHeaders(
          Authorization(OAuth2BearerToken(token))
        )

      val responseFuture2 = Http().singleRequest(request)
      val apiResponse = Await.result(
        responseFuture
          .flatMap { resp => ??? }
        timeout
      )

The server it is requesting from responds with a Proto of type ApiResponse At the line .flatMap { resp => ??? }, how would I go about correctly converting the sent byte array into an ApiResponse proto?

My original attempt was as follows:

          .flatMap { resp => resp.entity.toStrict(timeout) }
          .map { strictEntity => ApiResponse.parseFrom(strictEntity.data.asByteBuffer.array()) },

But this gave me the error:

ERROR[system-actor-akka.actor.default-dispatcher-5] OneForOneStrategy - null
java.nio.ReadOnlyBufferException: null
    at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1513)
    at server.RegistrationActor.$anonfun$4(RegistrationActor.scala:82)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1726)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1717)
    at java.base/java.util.concurrent.ForkJoinTask$InterruptibleTask.exec(ForkJoinTask.java:1641)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1458)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2034)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)

I tried looking into how the marshaller does it in the background, and implemented the following solution:

val responseFuture2 = Http().singleRequest(request)
      val apiResponse = Await.result(
        responseFuture
          .flatMap { resp => entityToBytes(resp.entity) }
          .map { arrayBytes => ApiResponse.parseFrom(arrayBytes) },
        timeout
      )

  private def entityToBytes(entity: HttpEntity): Future[Array[Byte]] = {
    entity.dataBytes.runFold(ByteString.empty)(_ ++ _)
      .map { value => value.toArray[Byte] }
  }

But I get the error:

ERROR[system-actor-akka.actor.default-dispatcher-5] OneForOneStrategy - Protocol message tag had invalid wire type: 3
com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type: 3
    at scalapb.UnknownFieldSet$Field$Builder.parseField(UnknownFieldSet.scala:106)
    at scalapb.UnknownFieldSet$Builder.parseField(UnknownFieldSet.scala:140)
    at api.ApiResponse.ApiResponse$.parseFrom(ApiResponse.scala:112)
    at api.ApiResponse.ApiResponse$.parseFrom(ApiResponse.scala:94)
    at scalapb.GeneratedMessageCompanion.parseFrom(GeneratedMessageCompanion.scala:186)
    at scalapb.GeneratedMessageCompanion.parseFrom$(GeneratedMessageCompanion.scala:164)
    at api.ApiResponse.ApiResponse$.parseFrom(ApiResponse.scala:92)
    at server.RegistrationActor.$anonfun$4(RegistrationActor.scala:83)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1726)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1717)
    at java.base/java.util.concurrent.ForkJoinTask$InterruptibleTask.exec(ForkJoinTask.java:1641)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1458)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2034)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)

Any suggestions on how to do this correctly?

UPDATE:

After some reading, seems I should be able to use my ProtobufMarshaller. But for some reason it isn't picking them up within the call.

I have:

class ApiFetcher ... with ProtobufMarshalling[ApiCall, ApiResponse] {
    ...
    def receive = {
        case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      Unmarshal(entity).to[ApiResponse].map { apiResponse =>

      }
    }
}

Which gives the error:

No given instance of type akka.http.scaladsl.unmarshalling.Unmarshaller[akka.http.scaladsl.model.ResponseEntity, api.ApiResponse.ApiResponse] was found. 

I found:  akka.http.scaladsl.unmarshalling.Unmarshaller.identityUnmarshaller[T]
 
 But method identityUnmarshaller in object Unmarshaller does not match type akka.http.scaladsl.unmarshalling.Unmarshaller[akka.http.scaladsl.model.ResponseEntity, api.ApiResponse.ApiResponse].

Solution

  • Ok so after playing around I found a solution where I pass the proto marshaller/unmarshaller manually.

    For example, when sending a request:

    Marshal(ApiCall())
          .to[RequestEntity](PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[ApiCall](r => r.toByteArray))
          .map({ entity =>
    
          })
    

    And when receiving a response:

    Unmarshal(entity)
          .to[ApiResponse](Unmarshaller.byteArrayUnmarshaller.map[ApiResponse](bytes => ApiResponse.parseFrom(bytes)))
          .map { apiResponse =>
              
          }
    

    But what I'm confused about is how in my ServerRoute class, I just use with ProtobufMarshalling[ApiCall, ApiResponse] and scala picks up on the marshallers inside the trait. Here, I have had to explicitly pass the marshalling function inside to[<type>](<marshaller/unmarshaller>).

    Could someone explain this please?