Search code examples
apache-sparkcassandraspark-cassandra-connector

spark worker insufficient memory


I have a spark/cassandra setup where I am using a spark cassandra java connector to query on a table. So far, I have 1 spark master node (2 cores) and 1 worker node (4 cores). Both of them have following spark-env.sh under conf/:

#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP="192.168.4.134"
export SPARK_WORKER_MEMORY=1G
export SPARK_EXECUTOR_MEMORY=2G

Here is my spark execution code:

    SparkConf conf = new SparkConf();
    conf.setAppName("Testing");
    conf.setMaster("spark://192.168.4.134:7077");
    conf.set("spark.cassandra.connection.host", "192.168.4.129");
    conf.set("spark.logConf", "true");
    conf.set("spark.driver.maxResultSize", "50m");
    conf.set("spark.executor.memory", "200m");
    conf.set("spark.eventLog.enabled", "true");
    conf.set("spark.eventLog.dir", "/tmp/");
    conf.set("spark.executor.extraClassPath", "/home/enlighted/ebd.jar");
    conf.set("spark.cores.max", "1");
    JavaSparkContext sc = new JavaSparkContext(conf);


    JavaRDD<String> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable("testing", "ec")
    .map(new Function<CassandraRow, String>() {
        private static final long serialVersionUID = -6263533266898869895L;
        @Override
        public String call(CassandraRow cassandraRow) throws Exception {
            return cassandraRow.toString();
        }
    });
    System.out.println("Data as CassandraRows: \n" + StringUtils.join(cassandraRowsRDD.toArray(), "\n"));
    sc.close();

Now I start master spark on first node and then worker on second node and thhen I run above code. It creates an executor thread on worker, but I see following message on application side logs:

[Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl  - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Now keeping the same setup, when I run spark/sbin/start-all.sh on master server, it creates master instance as well as worker instance on first node. Again when I run the same code and worker assigned is this new worker, it works perfectly fine.

What could be the issue with my original worker running on node different from master node?


Solution

  • Figured out the root cause. Master was randomly assigning port to worker for communication. Because of the firewall on master, worker couldn't send out messages to master (maybe like resource details). Weird worker didn't even bother to throw any error also.