Search code examples
scalaapache-sparkgoogle-cloud-pubsubgoogle-cloud-dataproc

Error in publishing data to pubSub from dataProc Spark job: No functional channel service provider found


I am running a spark scala job on GCP DataProc cluster. After processing data, I need to publish messages to PubSub topic but i'm getting an error as mentioned below.

No functional channel service provider found. Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact

Everything works fine till spark processing. As soon as i publish message to PubSub, I get this error. Look at the code...

Try {

  val topicName = TopicName.of(projectName, pubSubTopicName)

  val scope = new ArrayList[String]()
  scope.add("https://www.googleapis.com/auth/pubsub")

  val googleCredentials = GoogleCredentials
        .fromStream(getClass.getResourceAsStream("file path")
        .createScoped(scope)

  val batchingSettings = BatchingSettings
  .newBuilder()
  .setElementCountThreshold(elementCountThreshold)
  .setRequestByteThreshold(requestByteThreshold)
  .setDelayThreshold(delayDuration)
  .build()

  val publisher = getPublisher(
    topicName,
    batchingSettings,
    googleCredentials
  )

  val publishedData: MutableList[String] = MutableList()

  for (pubMessage <- dataToBePublished) {
    val pubSubMessage =
      getPubSubMessage(
        ByteString.copyFromUtf8(pubMessage)
      )

    val messageIdFuture = publisher.publish(pubSubMessage)

    publishedData.+=(messageIdFuture.get)
  }
}

def getPublisher(
      topicName: TopicName,
      batchingSettings: BatchingSettings,
      googleCredentials: GoogleCredentials
  ): Publisher = {

Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setBatchingSettings(batchingSettings)
  .build()

}

def getPubSubMessage( data: ByteString ): PubsubMessage = {

PubsubMessage
  .newBuilder()
  .setData(data)
  .build()

}

As it shows channel error, i tried the below change in Publisher but same error

    Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setChannelProvider(
    TopicAdminSettings
      .defaultGrpcTransportProviderBuilder()
      .build()
  )
  .build()

I also tried to add dependencies in sbt but still same error

"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"

All three suggested dependencies are there in libraries, still error.

Please help for this issue, thanks in advance.


Solution

  • So the issue is in assembling fat jar because of pubsub library.

    Here are the changes required in build.sbt

    • Add dependency of grpc-netty only.
    "io.grpc" % "grpc-netty" % "1.49.2"
    
    • change merge strategy in jar assembling.
    assemblyShadeRules in assembly := Seq(
      ShadeRule
        .rename("com.google.common.**" -> "repackaged.com.google.common.@1")
        .inAll,
      ShadeRule
        .rename("com.google.protobuf.**" -> "repackaged.com.google.protobuf.@1")
        .inAll,
    )
    
    • Also, shade (repackage) some of com.google's libraries which cause issue at runtime in publisher creation.
    assemblyMergeStrategy in assembly := {
      case x if Assembly.isConfigFile(x) =>
        MergeStrategy.concat
      case PathList(ps @ _*) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) =>
        MergeStrategy.rename
      case PathList("META-INF", xs @ _*) =>
        (xs map { _.toLowerCase }) match {
          case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
            MergeStrategy.discard
          case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
            MergeStrategy.discard
          case "plexus" :: xs =>
            MergeStrategy.discard
          case "services" :: xs =>
            MergeStrategy.filterDistinctLines
          case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
            MergeStrategy.filterDistinctLines
          case _ => MergeStrategy.first
        }
      case _ => MergeStrategy.first
    }
    

    This will work without runtime error.