First off, I'm not experienced with Akka so I'm really bad in debugging this on my own. I tried the example from here, and publishing messages works (which means credentials work), but no messages are emitted. The service account is granted all permissions.
My code looks like this, which is basically exactly the same as in the example:
package com.example.google.pubsub
import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import scala.concurrent.duration._
import scala.concurrent.Future
object SubscriberMain extends App {
println("#### SUBSCRIBER ####")
val privateKey: PrivateKey = {
import java.io.FileInputStream
val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
val privateKey = credential.getServiceAccountPrivateKey
privateKey
}
val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
val projectId = "weirdproject"
val apiKey = "xxxx"
val subscription = "somesubscription"
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
}
I found out that akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic
is never executed, which seems to be the reason why no messages are fetched.
What you have is a definition of a stream, but you're not running it. Call run()
:
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
.run() // <---
Alternatively, use runWith()
, which is a convenience method that returns the materialized value of the Sink
:
val result: Future[Done] =
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.runWith(ackSink)
More about defining and running streams is found here.