Search code examples
scalaakkaakka-streamgoogle-cloud-pubsubalpakka

Possible encoding issue with Google PubSub


When running a subscription source from the Alpakka PubSub library I received possible encoded data.

@Singleton
class Consumer @Inject()(config: Configuration, credentialsService: google.creds.Service)(implicit actorSystem: ActorSystem) {

  implicit val m: ActorMaterializer = ActorMaterializer.create(actorSystem)
  val logger = Logger(this.getClass)
  val subName: String = config.get[String]("google.pubsub.subname")
  val credentials: Credentials = credentialsService.getCredentials
  val pubSubConfig = PubSubConfig(credentials.projectId, credentials.clientEmail, credentials.privateKey)

  val subSource: Source[ReceivedMessage, NotUsed] = GooglePubSub.subscribe(subName, pubSubConfig)
  val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(subName, pubSubConfig)

  val computeGraph = Flow[ReceivedMessage].map {
    x =>
      logger.info(x.message.data)
      x
  }

  val ackGraph = Flow.fromFunction((msgs: Seq[ReceivedMessage]) => AcknowledgeRequest(msgs.map(_.ackId).toList))

  subSource
    .via(computeGraph)
    .groupedWithin(10, 5.minutes)
    .via(ackGraph)
    .to(ackSink)
    .run()
}

I publish the message from the PubSub console. I am expecting my test message to appear however when publishing test I receive dGVzdA==. Is this an expected result? I have had issues with importing the private key and this might be a result of it?

The consumer is bound eagerly with Guice.


Solution

  • Data that is received over REST apis will be base64 encoded. My guess would be that the Alpakka Pub/Sub library which uses the REST APIs is not properly decoding the received data. It looks like they also have a library that uses the GRPC Pub/Sub client as the underlying layer which may not suffer from this defect? You can also use the Cloud Pub/Sub Java client library from Scala directly.