I'm using Scala with akka-http. I have a protobuf marshaller that works just fine on incoming routes:
trait ProtobufMarshalling[T <: GeneratedMessage, E <: GeneratedMessage] {
implicit def protobufMarshaller: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)
implicit def protobufUnmarshaller(implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
}
}
I can then use this in my Routes
object using with ProtobufMarshalling
and:
post {
entity(as[PROTO_IN]) { proto =>
complete(PROTO_OUT())
}
}
The server also acts as a client, with communications between itself and another instance. I have a section of code that sends a HttpRequest
to a registered instance:
val request2 = HttpRequest(
method = HttpMethods.POST,
uri = s"http://${server.getServerIP}/api/registration",
entity = HttpEntity(ContentTypes.`application/octet-stream`, ApiCall().withRegistrationRequest(RegistrationRequest(SERVER_ID, "127.0.0.1")).toByteArray)
)
.withHeaders(
Authorization(OAuth2BearerToken(token))
)
val responseFuture2 = Http().singleRequest(request)
val apiResponse = Await.result(
responseFuture
.flatMap { resp => ??? }
timeout
)
The server it is requesting from responds with a Proto of type ApiResponse
At the line .flatMap { resp => ??? }
, how would I go about correctly converting the sent byte array into an ApiResponse
proto?
My original attempt was as follows:
.flatMap { resp => resp.entity.toStrict(timeout) }
.map { strictEntity => ApiResponse.parseFrom(strictEntity.data.asByteBuffer.array()) },
But this gave me the error:
ERROR[system-actor-akka.actor.default-dispatcher-5] OneForOneStrategy - null
java.nio.ReadOnlyBufferException: null
at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1513)
at server.RegistrationActor.$anonfun$4(RegistrationActor.scala:82)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1726)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1717)
at java.base/java.util.concurrent.ForkJoinTask$InterruptibleTask.exec(ForkJoinTask.java:1641)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1458)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2034)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)
I tried looking into how the marshaller does it in the background, and implemented the following solution:
val responseFuture2 = Http().singleRequest(request)
val apiResponse = Await.result(
responseFuture
.flatMap { resp => entityToBytes(resp.entity) }
.map { arrayBytes => ApiResponse.parseFrom(arrayBytes) },
timeout
)
private def entityToBytes(entity: HttpEntity): Future[Array[Byte]] = {
entity.dataBytes.runFold(ByteString.empty)(_ ++ _)
.map { value => value.toArray[Byte] }
}
But I get the error:
ERROR[system-actor-akka.actor.default-dispatcher-5] OneForOneStrategy - Protocol message tag had invalid wire type: 3
com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type: 3
at scalapb.UnknownFieldSet$Field$Builder.parseField(UnknownFieldSet.scala:106)
at scalapb.UnknownFieldSet$Builder.parseField(UnknownFieldSet.scala:140)
at api.ApiResponse.ApiResponse$.parseFrom(ApiResponse.scala:112)
at api.ApiResponse.ApiResponse$.parseFrom(ApiResponse.scala:94)
at scalapb.GeneratedMessageCompanion.parseFrom(GeneratedMessageCompanion.scala:186)
at scalapb.GeneratedMessageCompanion.parseFrom$(GeneratedMessageCompanion.scala:164)
at api.ApiResponse.ApiResponse$.parseFrom(ApiResponse.scala:92)
at server.RegistrationActor.$anonfun$4(RegistrationActor.scala:83)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1726)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1717)
at java.base/java.util.concurrent.ForkJoinTask$InterruptibleTask.exec(ForkJoinTask.java:1641)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1458)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2034)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)
Any suggestions on how to do this correctly?
UPDATE:
After some reading, seems I should be able to use my ProtobufMarshaller
. But for some reason it isn't picking them up within the call.
I have:
class ApiFetcher ... with ProtobufMarshalling[ApiCall, ApiResponse] {
...
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
Unmarshal(entity).to[ApiResponse].map { apiResponse =>
}
}
}
Which gives the error:
No given instance of type akka.http.scaladsl.unmarshalling.Unmarshaller[akka.http.scaladsl.model.ResponseEntity, api.ApiResponse.ApiResponse] was found.
I found: akka.http.scaladsl.unmarshalling.Unmarshaller.identityUnmarshaller[T]
But method identityUnmarshaller in object Unmarshaller does not match type akka.http.scaladsl.unmarshalling.Unmarshaller[akka.http.scaladsl.model.ResponseEntity, api.ApiResponse.ApiResponse].
Ok so after playing around I found a solution where I pass the proto marshaller/unmarshaller manually.
For example, when sending a request:
Marshal(ApiCall())
.to[RequestEntity](PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[ApiCall](r => r.toByteArray))
.map({ entity =>
})
And when receiving a response:
Unmarshal(entity)
.to[ApiResponse](Unmarshaller.byteArrayUnmarshaller.map[ApiResponse](bytes => ApiResponse.parseFrom(bytes)))
.map { apiResponse =>
}
But what I'm confused about is how in my ServerRoute
class, I just use with ProtobufMarshalling[ApiCall, ApiResponse]
and scala picks up on the marshallers inside the trait. Here, I have had to explicitly pass the marshalling function inside to[<type>](<marshaller/unmarshaller>)
.
Could someone explain this please?