Search code examples
pythonpysparkcassandradatastaxspark-cassandra-connector

pyspark not connecting to local cassandra


I am trying to read from a cassandra table using pyspark. The code im using is shown below

import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Suraj-Vashistha-Laptop:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table='movies', keyspace='practice')\
        .load()

I have even used the spark-cassandra-connector version 3.0.0 , python 3

Spark is Up and running as shown below:

enter image description here

So as shown above there is a spark instance that got initialized and its in waiting state. It is running in default localhost:8080

Cassandra is also up and running: enter image description here

Cassandra is also running on localhost:9042

So both are on the local machine itself. Despite all this I am still unable to connect and run the code. I get the following error:

Py4JJavaError: An error occurred while calling o36.load.
: java.io.IOException: Failed to open native connection to Cassandra at {127.0.0.1:9042} :: Could not reach any contact point, make sure you've provided valid addresses 

There are similar questions on stack overflow but they are all 2 or 3+ yrs old and none of the answers worked for me. Hence, I have asked this again for fresh perspective.

My cassandra version details are [cqlsh 6.0.0 | Cassandra 4.0 | CQL spec 3.4.5 | Native protocol v5]

I have kept everything in its latest versions only.

So, I am unable to figure out what went wrong cause everything seems to be running but error says could not reach any contact point. What went wrong??

EDIT: I was recently able to use the exact same code and connect to a cassandra database hosted on a different IP(10.160.0.2) but for localhost it does not work.

EDIT2: I executed the same code(The one where I have localhost and not the first edit IP) line by line on the spark-shell. I launched it as spark-shell --packages the cassandra connector of datastax. So, I observed that all works fine till I reach the .load() or .show() or .first(). I could do the following

val conf = new SparkConf(true).set("spark.cassandra.connection.host", cassandraHost)
val sc = new SparkContext("spark://" + sparkMasterHost + ":7077", "example", conf)
val rdd = sc.cassandraTable(keyspace, table)

At this point the output(which I have put on the next line ) is an object created and no error is found till now

rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18

The issue happens next when I try to do rdd.show() or rdd.first() or rdd.collect(). And I get the same error I have posted in the question above(failed to open native connection, could not reach any contact point).


Solution

  • This exception indicates that you most likely have a network connectivity issue:

    java.io.IOException: Failed to open native connection to Cassandra at {127.0.0.1:9042} :: \
      Could not reach any contact point, make sure you've provided valid addresses
    

    Your pyspark app cannot connect to localhost on port 9042. Check that you don't have software firewalls such as iptables or firewalld that is preventing access to the Cassandra node.

    One other thing to check is the IP you've configured for rpc_address. As an alternative for your test, set the RPC address to your laptop's IP and restart Cassandra. Then set spark.cassandra.connection.host to your laptop's IP in your pyspark configuration to see if it works.

    As a side note, you stated this in your second edit:

    I launched it as spark-shell --packages the cassandra connector of datastax. So, I observed that all works fine till I reach the .load() or .show() or .first().

    The reason for that is evaluations in Spark are lazy -- nothing is executed until you call an action. You don't see any errors until you call an action like collect() because nothing gets executed until that point in your code. Cheers!