Search code examples
javaapache-sparkcassandraamazon-emrspark-cassandra-connector

Spark Job with Multiple Contexts is failing


In the spark application we are making two spark contexts,

1) For reading data from file system.

2) For connecting and loading data to Cassandra.

In one app we can run only one spark spark context, so we are stopping 1st then starting second.

I was getting the following error.

Error 1) 16/03/10 05:40:44 ERROR Utils: Uncaught exception in thread      Thread-2
java.io.IOException: Target log file already exists        (hdfs:///var/log/spark/apps/application_1457586850134_0001_2)
    at `org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:225)`

It was because , in hadoop the directory/file need not be present when running the job (1st context), so for running 2nd context the log direcory/log file was present I So was the above error.

I solved the issue by setting spark.eventLog.overwrite=true


Error 2) WARN executor.CoarseGrainedExecutorBackend: An unknown (ip-10-93-141-13.ec2.internal:48849) driver disconnected. 16/03/10 06:47:37                                                            ERROR executor.CoarseGrainedExecutorBackend: Driver 10.93.141.13:48849disassociated! Shutting down.

I tried increasing the

spark.yarn.driver.memoryOverhead=1024

spark.yarn.executor.memoryOverhead=1024

But the problem is still there.


Error 3)

Exception in thread "main" java.io.IOException: Failed to connect to /10.93.141.13:46008
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

I have checked in in the core node , it is not listening on port 46008.


Error 4 )

WARN YarnAllocator: Container marked as failed:              container_1457586850134_0006_01_000006 on host: ip-10-164-169-         46.ec2.internal. Exit status: 1. Diagnostics: Exception from container-   launch.
Container id: container_1457586850134_0006_01_000006
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
at org.apache.hadoop.util.Shell.run(Shell.java:456)
at   org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/03/10 06:47:17 WARN ApplicationMaster: Reporter thread fails 1 time(s) in a row.
java.lang.IllegalStateException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159)
    at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:131)
    at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:192)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:516)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:531)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:512)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:512)
    at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:442)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.deploy.yarn.YarnAllocator.processCompletedContainers(YarnAllocator.scala:442)
    at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:242)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:368)

The above error is since the container is failing continously.


It seems the above spark issue's due to multiple contexts, as raised in the mailing list.

https://mail-archives.apache.org/mod_mbox/spark-issues/201602.mbox/%3CJIRA.12936774.1454603470000.316442.1454707659978@Atlassian.JIRA%3E

sparkContext.stop() is not able to free up the resources.


I'm running the using the following options

--class com.mobi.vserv.driver.Query5kPids                            
  --conf spark.eventLog.overwrite=true  
  --conf spark.yarn.executor.memoryOverhead=1024                       
  --conf spark.yarn.driver.memoryOverhead=1024                             
  --num-executors 4                
  --executor-memory 3g            
  --executor-cores 2                  
  --driver-memory 3g

I'm running on EMR , with a master and 2 slave nodes, master has 8 core and 16GB memory and each slave has 4 core 5120 MB available memory.

Below is my code.

    public class Query5kPids implements Serializable{

    static List<UserSetGet> ListFromS3 = new ArrayList<UserSetGet>();

    public static void main(String[] args) throws JSONException, IOException, InterruptedException, URISyntaxException {


    SparkConf conf = new SparkConf();
    conf.setAppName("Spark-Cassandra Integration");
    conf.setMaster("yarn-cluster");
    conf.set("spark.cassandra.connection.host", "12.16.193.19");
    conf.set("spark.cassandra.connection.port", "9042");


    SparkConf conf1 = new SparkConf().setAppName("SparkAutomation").setMaster("yarn-cluster");

    Query5kPids app1 = new Query5kPids(conf1);
    app1.run1(file);

    Query5kPids app = new Query5kPids(conf);
    System.out.println("Both RDD has been generated");
    app.run();

   }

   private void run() throws JSONException, IOException, InterruptedException {

   JavaSparkContext sc = new JavaSparkContext(conf);
   query(sc);
   sc.stop();
   }

   private void run1(File file) throws JSONException, IOException,     InterruptedException {
   JavaSparkContext sc = new JavaSparkContext(conf);
   getData(sc,file);
   sc.stop();

  }

   private void getData(JavaSparkContext sc, File file) {

   JavaRDD<String> Data = sc.textFile(file.toString());
   System.out.println("RDD Count is " + Data.count());

   // Other map opetations to convert to UserSetGet RDD.
   ListFromS3 = Data.collect();

   }
   private void query(JavaSparkContext sc) {

   System.out.prin`enter code here`tln("RDD Count is " +  ListFromS3.size());
   //This gets printed. 
   //Which means it application is coming to the second part of the program.

   for (int i = 0; i < ListFromS3.size(); i++) {

   sb.append(ListFromS3.get(i).getApnid());
   sb.append(',');
   }
   sb.setLength(sb.length() - 3);

   JavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable("dmp", "user_profile_spark_test").select       ("app_day_count", "app_first_seen","app_last_seen", "app_usage_count", "total_day_count", "total_usage_count")
 .where("apnid IN ('" + sb + "')");

   if(cassandraRDD.isEmpty()){

   JavaRDD<UserSetGet> rddFromGz = sc.parallelize(ListFromS3);

   CassandraJavaUtil.javaFunctions(rddFromGz).writerBuilder("dmp", "user_profile_spark_test", mapToRow(UserSetGet.class)).saveToCassand();
        logger.info("DataSaved");
   }
   }

   }

Below is my POM

   <dependencies>
   <dependency>
   <groupId>org.apache-extras.cassandra-jdbc</groupId>
   <artifactId>cassandra-jdbc</artifactId>
   <version>1.2.5</version>
   </dependency>

   <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>3.8.1</version>
   <scope>test</scope>
   </dependency>
   <dependency>
   <groupId>org.codehaus.jettison</groupId>
   <artifactId>jettison</artifactId>
   <version>1.3.7</version>
   </dependency>
   <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
   </dependency>


   <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.10</artifactId>
   <version>1.6.0</version>
   </dependency>

   <dependency>
   <groupId>com.datastax.spark</groupId>
   <artifactId>spark-cassandra-connector_2.10</artifactId>
   <version>1.5.0-M1</version>
   </dependency>

   <dependency>
   <groupId>com.datastax.cassandra</groupId>
   <artifactId>cassandra-driver-core</artifactId>
   <version>2.1.6</version>
   </dependency>

   <dependency>
   <groupId>com.datastax.spark</groupId>
   <artifactId>spark-cassandra-connector-java_2.10</artifactId>
   <version>1.5.0-M3</version>
   </dependency>

   <dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-collections4</artifactId>
   <version>4.1</version>
   </dependency>
   </dependencies>

Solution

  • We are running tests with local spark context and are using following "hack" to solve collision problems:

    sc.stop()
    // To avoid Akka rebinding to the same port, since it doesn't unbind 
    // immediately on shutdown
    
    System.clearProperty("spark.driver.port")
    

    Any reason you are using 2 different spark contexts? Why can't you use only 1?