Environment: Kafka 10, Spark2.1
I am trying to store Kakfa offset external storage. After going through Apache Spark web site,and some online research, I was able to write following code. Now getting error-
"Error:(190, 7) object Assign in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)"
My code:
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object offTest {
def main(args: Array[String]) {
val Array(impalaHost, brokers, topics, consumerGroupId, ssl, truststoreLocation, truststorePassword, wInterval) = args
val sparkSession = SparkSession.builder
.config("spark.hadoop.parquet.enable.summary-metadata", "false")
.enableHiveSupport()
.getOrCreate
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(wInterval.toInt))
val isUsingSsl = ssl.toBoolean
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val commonParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"security.protocol" -> (if (isUsingSsl) "SASL_SSL" else "SASL_PLAINTEXT"),
"sasl.kerberos.service.name" -> "kafka",
"auto.offset.reset" -> "latest",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> consumerGroupId,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val additionalSslParams = if (isUsingSsl) {
Map(
"ssl.truststore.location" -> truststoreLocation,
"ssl.truststore.password" -> truststorePassword
)
} else {
Map.empty
}
val kafkaParams = commonParams ++ additionalSslParams
val fromOffsets= Map[Object,Long](new TopicPartition(topics, 4) -> 4807048129L)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
// I will insert those values to other database later
}
}
val data= stream.map(record => (record.key, record.value))
data.foreachRDD(rdd1 => {
val value = rdd1.map(x => x._2)
if (! value.isEmpty()) {
value.foreach(println)
}
else
{println("no data")}
})
ssc.start()
ssc.awaitTermination()
}
}
Error is on following line-
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
I did some research, and it seems reason behind this is conflict among imported packages. However, I could not resolve this. Any kind of helping or code sample would be highly appreciated.
Thanks Raisha
You need to create an Assign
instance via the ConsumerStrategies
object, like this:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)