I am trying to connect spark and Cassandra using spark-cassandra-connector. the connection gets established but when i am trying to perform operations on JavaRDD i am facing.
java.io.IOException: Failed to open native connection to Cassandra at {10.0.21.92}:9042
Here is the configuration and code which i am trying to implement :
SparkConf sparkConf = new SparkConf().setAppName("Data Transformation").set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", server ip);
sparkConf.set("spark.cassandra.connection.port", "9042");
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
sparkConf.set("spark.cassandra.auth.username", user_name);
sparkConf.set("spark.cassandra.auth.password", password);
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
And below is the code where i am performing operation on javardd:
CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keySpaceName, tableName);
JavaRDD<GenericTriggerEntity> rdd = cassandraRDD.map(new Function<CassandraRow, GenericTriggerEntity>() {
private static final long serialVersionUID = -165799649937652815L;
@Override
public GenericTriggerEntity call(CassandraRow row) throws Exception {
GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
if(row.getString("end") != null) genericTriggerEntity.setEnd(row.getString("end"));
if(row.getString("key") != null)
genericTriggerEntity.setKey(row.getString("key"));
genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
genericTriggerEntity.setRowdeleted(row.getString("rowDeleted"));
genericTriggerEntity.setRows(row.getString("rows"));
genericTriggerEntity.setStart(row.getString("start"));
genericTriggerEntity.setTablename("tablename");
genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
genericTriggerEntity.setTriggertime(row.getString("triggertime"));
genericTriggerEntity.setUuid(row.getUUID("uuid"));
return genericTriggerEntity;
}
});
Here is the JavaRDD operation i am performing
JavaRDD<String> jsonDataRDDwords = rdd.flatMap(s -> Arrays.asList(SPACE.split((CharSequence) s)));
JavaPairRDD<String, Integer> jsonDataRDDones = jsonDataRDDwords.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> jsonDataRDDcounts = jsonDataRDDones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> jsonDatRDDoutput = jsonDataRDDcounts.collect();
I even tried telnet to Cassandra server the port is open.
I am able to establish the connection but then while performing reduceByKey getting the above exception.
I am not able to figure out what is the issue. Is something wrong in the javardd operation. Any help would be appreciated. Thanks In Advance.
The above error was due to some dependency issue of cassandra drive core. solved it by adding metric dependency in my pom.xml
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>