my streaming app not saving data to cas,
I tried for different ways, to use foreachRDD
and stream.print
to find out why it does not work, but it does not print anything.
For input data i use kafka-console-producer.sh
object letsRun extends App {
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark._
import com.datastax.spark.connector._
import org.apache.spark.sql._
import com.datastax.spark.connector.writer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import com.datastax.spark.connector.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val conf = new SparkConf().setMaster("local[*]").setAppName("test").set("spark.cassandra.connection.host", "192.168.1.44")
//val sc = new SparkContext(conf)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.46:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Set[String]("testTopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
implicit val sqlRowWriter = SqlRowWriter.Factory
val messages = stream.map(record => (record.key, record.value))
messages.saveToCassandra("ks", "tb", SomeColumns("key", "value"))
ssc.start()
ssc.awaitTermination()
}
the out put in Eclipse :
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/12 20:23:25 INFO SparkContext: Running Spark version 2.1.1
17/06/12 20:23:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/06/12 20:23:26 WARN Utils: Your hostname, dev resolves to a loopback address: 127.0.0.1; using 192.168.1.41 instead (on interface wlo1)
17/06/12 20:23:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/06/12 20:23:26 INFO SecurityManager: Changing view acls to: dev
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls to: dev
17/06/12 20:23:26 INFO SecurityManager: Changing view acls groups to:
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls groups to:
17/06/12 20:23:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dev); groups with view permissions: Set(); users with modify permissions: Set(dev); groups with modify permissions: Set()
17/06/12 20:23:26 INFO Utils: Successfully started service 'sparkDriver' on port 39585.
17/06/12 20:23:26 INFO SparkEnv: Registering MapOutputTracker
17/06/12 20:23:26 INFO SparkEnv: Registering BlockManagerMaster
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/06/12 20:23:26 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8eb85c8e-216f-4b69-a567-3e7833cd675d
17/06/12 20:23:26 INFO MemoryStore: MemoryStore started with capacity 870.9 MB
17/06/12 20:23:26 INFO SparkEnv: Registering OutputCommitCoordinator
17/06/12 20:23:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/06/12 20:23:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.41:4040
17/06/12 20:23:27 INFO Executor: Starting executor ID driver on host localhost
17/06/12 20:23:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43841.
17/06/12 20:23:27 INFO NettyBlockTransferService: Server created on 192.168.1.41:43841
17/06/12 20:23:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/06/12 20:23:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.41:43841 with 870.9 MB RAM, BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 WARN KafkaUtils: overriding enable.auto.commit to false for executor
17/06/12 20:23:27 WARN KafkaUtils: overriding auto.offset.reset to none for executor
17/06/12 20:23:27 WARN KafkaUtils: overriding executor group.id to spark-executor-use_a_separate_group_id_for_each_stream
17/06/12 20:23:27 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
Sorry for question, i solved it by myself I set
ssc.sparkContext.setLogLevel("DEBUG")
spark tried to resolve hostname of my virtual machine, but in config kafka address i used IP, so added address to /etc/hosts and it works!