Search code examples
scalagoogle-cloud-platformakkaakka-streamalpakka

Google Pub/Sub Subscriber does not receive messages


First off, I'm not experienced with Akka so I'm really bad in debugging this on my own. I tried the example from here, and publishing messages works (which means credentials work), but no messages are emitted. The service account is granted all permissions.

My code looks like this, which is basically exactly the same as in the example:

package com.example.google.pubsub

import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential

import scala.concurrent.duration._
import scala.concurrent.Future

object SubscriberMain extends App {
  println("#### SUBSCRIBER ####")

  val privateKey: PrivateKey = {
    import java.io.FileInputStream
    val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
    val privateKey = credential.getServiceAccountPrivateKey
    privateKey
  }
  val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
  val projectId = "weirdproject"
  val apiKey = "xxxx"
  val subscription = "somesubscription"

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val subscriptionSource: Source[ReceivedMessage, NotUsed] =
    GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)

  val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
    GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)

  subscriptionSource
    .map { message =>
      val data = message.message.data
      println(s"received a message: $data")
      message.ackId
    }
    .groupedWithin(1000, 1.minute)
    .map(AcknowledgeRequest.apply)
    .to(ackSink)

}

I found out that akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic is never executed, which seems to be the reason why no messages are fetched.


Solution

  • What you have is a definition of a stream, but you're not running it. Call run():

    subscriptionSource
      .map { message =>
        val data = message.message.data
        println(s"received a message: $data")
        message.ackId
      }
      .groupedWithin(1000, 1.minute)
      .map(AcknowledgeRequest.apply)
      .to(ackSink)
      .run() // <---
    

    Alternatively, use runWith(), which is a convenience method that returns the materialized value of the Sink:

    val result: Future[Done] =
      subscriptionSource
        .map { message =>
          val data = message.message.data
          println(s"received a message: $data")
          message.ackId
        }
        .groupedWithin(1000, 1.minute)
        .map(AcknowledgeRequest.apply)
        .runWith(ackSink)
    

    More about defining and running streams is found here.