Search code examples
scalaapache-kafkaspark-streamingoffset

"object Assign in package kafka010 cannot be accessed " facing issue during kafka ofset mgt


Environment: Kafka 10, Spark2.1

I am trying to store Kakfa offset external storage. After going through Apache Spark web site,and some online research, I was able to write following code. Now getting error-

"Error:(190, 7) object Assign in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)"

My code:

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
           import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
           import org.apache.kafka.common.TopicPartition
           import org.apache.spark.{SparkConf, SparkContext, TaskContext}
           import org.apache.spark.rdd.RDD
           import org.apache.spark.sql.{Row, SparkSession}
           import org.apache.spark.streaming.dstream.InputDStream
           import org.apache.spark.streaming.kafka010._
           import org.apache.spark.streaming.{Seconds, StreamingContext}
           import org.apache.spark.sql.functions._
           import org.apache.spark.sql.types.{StringType, StructField, StructType}

           object offTest {
             def main(args: Array[String]) {

           val Array(impalaHost, brokers, topics, consumerGroupId, ssl, truststoreLocation, truststorePassword, wInterval) = args

               val sparkSession = SparkSession.builder
                 .config("spark.hadoop.parquet.enable.summary-metadata", "false")
                 .enableHiveSupport()
                 .getOrCreate

               val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(wInterval.toInt))

               val isUsingSsl = ssl.toBoolean

               // Create direct kafka stream with brokers and topics
               val topicsSet = topics.split(",").toSet
               val commonParams = Map[String, Object](
                 "bootstrap.servers" -> brokers,
                 "security.protocol" -> (if (isUsingSsl) "SASL_SSL" else "SASL_PLAINTEXT"),
                 "sasl.kerberos.service.name" -> "kafka",
                 "auto.offset.reset" -> "latest",
                 "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                 "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                 "group.id" -> consumerGroupId,
                 "enable.auto.commit" -> (false: java.lang.Boolean)
               )

               val additionalSslParams = if (isUsingSsl) {
                 Map(
                   "ssl.truststore.location" -> truststoreLocation,
                   "ssl.truststore.password" -> truststorePassword
                 )
               } else {
                 Map.empty
               }

               val kafkaParams = commonParams ++ additionalSslParams
               val fromOffsets= Map[Object,Long](new TopicPartition(topics, 4) -> 4807048129L)

               val stream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
               )
               stream.foreachRDD { rdd =>
                 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                 rdd.foreachPartition { iter =>
                   val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                   println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
           // I will insert those values to other database later
                 }
               }    
              val data= stream.map(record => (record.key, record.value))
               data.foreachRDD(rdd1 => {
                 val value = rdd1.map(x => x._2)
                 if (! value.isEmpty()) {
                   value.foreach(println)
                 }
                 else
                 {println("no data")}
               })
               ssc.start()
               ssc.awaitTermination()
             }
           }

Error is on following line-

Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)

I did some research, and it seems reason behind this is conflict among imported packages. However, I could not resolve this. Any kind of helping or code sample would be highly appreciated.

Thanks Raisha


Solution

  • You need to create an Assign instance via the ConsumerStrategies object, like this:

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
    )