Search code examples
amazon-web-servicesamazon-s3apache-sparkavro

Spark program running infinitely


Scenario : I am trying to read an Avro file stored in S3 and create a DataFrame out of it using databricks Spark-Avro library. This is the code which I am using :

package org.myorg.dataframe;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class S3DataFrame {

    public static void main(String[] args) {

        System.out.println("START...");
        SparkConf conf = new SparkConf().setAppName("S3DataFrame").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        Configuration config = sc.hadoopConfiguration();

        //FOR s3a
        config.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        config.set("fs.s3a.access.key","****************");
        config.set("fs.s3a.secret.key","********************");
        config.set("fs.s3a.endpoint", "s3-us-west-2.amazonaws.com");
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.load("s3a://bucket-name/employees.avro", "com.databricks.spark.avro");
        df.show();
        df.printSchema();
        df.select("name").show();
        System.out.println("DONE");
//      df.save("/Users/miqbal1/avvvvvv.avro/", "com.databricks.spark.avro");
    }
}

Problem : The program seems to be running infinitely. It's not throwing any exception, but keeps on running continuously with following trace :

.
.
.
.
15/05/18 17:35:44 INFO HttpServer: Starting HTTP Server
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started [email protected]:60316
15/05/18 17:35:44 INFO Utils: Successfully started service 'HTTP file server' on port 60316.
15/05/18 17:35:44 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started [email protected]:4040
15/05/18 17:35:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/05/18 17:35:44 INFO SparkUI: Started SparkUI at http://172.28.210.74:4040
15/05/18 17:35:44 INFO Executor: Starting executor ID <driver> on host localhost
15/05/18 17:35:44 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:60315/user/HeartbeatReceiver
15/05/18 17:35:44 INFO NettyBlockTransferService: Server created on 60317
15/05/18 17:35:44 INFO BlockManagerMaster: Trying to register BlockManager
15/05/18 17:35:44 INFO BlockManagerMasterActor: Registering block manager localhost:60317 with 66.9 MB RAM, BlockManagerId(<driver>, localhost, 60317)
15/05/18 17:35:44 INFO BlockManagerMaster: Registered BlockManager
15/05/18 17:35:45 WARN AmazonHttpClient: Detected a possible problem with the current JVM version (1.6.0_65).  If you experience XML parsing problems using the SDK, try upgrading to a more recent JVM update.
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:48 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:48 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(230868) called with curMem=0, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 225.5 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(31491) called with curMem=230868, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60317 (size: 30.8 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:82
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:50 INFO FileInputFormat: Total input paths to process : 1
15/05/18 17:35:50 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122
15/05/18 17:35:50 INFO DAGScheduler: Got job 0 (runJob at SparkPlan.scala:122) with 1 output partitions (allowLocal=false)
15/05/18 17:35:50 INFO DAGScheduler: Final stage: Stage 0(runJob at SparkPlan.scala:122)
15/05/18 17:35:50 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:50 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:50 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=262359, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=265807, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/05/18 17:35:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97)
15/05/18 17:35:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/05/18 17:35:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/05/18 17:35:50 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:0+1
15/05/18 17:35:50 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/05/18 17:35:50 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/05/18 17:35:50 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/05/18 17:35:50 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/05/18 17:35:50 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:51 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:51 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:51 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:51 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:51 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:53 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -597
15/05/18 17:35:53 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:53 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1800 bytes result sent to driver
15/05/18 17:35:53 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2782 ms on localhost (1/1)
15/05/18 17:35:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/05/18 17:35:53 INFO DAGScheduler: Stage 0 (runJob at SparkPlan.scala:122) finished in 2.797 s
15/05/18 17:35:53 INFO DAGScheduler: Job 0 finished: runJob at SparkPlan.scala:122, took 2.974724 s
15/05/18 17:35:53 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122
15/05/18 17:35:53 INFO DAGScheduler: Got job 1 (runJob at SparkPlan.scala:122) with 596 output partitions (allowLocal=false)
15/05/18 17:35:53 INFO DAGScheduler: Final stage: Stage 1(runJob at SparkPlan.scala:122)
15/05/18 17:35:53 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:53 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:53 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents
15/05/18 17:35:53 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=268193, maxMem=70177259
15/05/18 17:35:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:53 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=271641, maxMem=70177259
15/05/18 17:35:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/05/18 17:35:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/05/18 17:35:53 INFO DAGScheduler: Submitting 596 missing tasks from Stage 1 (MapPartitionsRDD[2] at map at SparkPlan.scala:97)
15/05/18 17:35:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 596 tasks
15/05/18 17:35:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/05/18 17:35:53 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:1+1
15/05/18 17:35:53 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:54 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:54 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:54 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:54 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:54 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:55 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -596
15/05/18 17:35:55 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 1
15/05/18 17:35:56 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1800 bytes result sent to driver
15/05/18 17:35:56 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:56 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
15/05/18 17:35:56 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2224 ms on localhost (1/596)
15/05/18 17:35:56 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:2+1
15/05/18 17:35:56 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:56 INFO BlockManager: Removing broadcast 1
15/05/18 17:35:56 INFO BlockManager: Removing block broadcast_1_piece0
15/05/18 17:35:56 INFO MemoryStore: Block broadcast_1_piece0 of size 2386 dropped from memory (free 69905618)
15/05/18 17:35:56 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:60317 in memory (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:56 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/18 17:35:56 INFO BlockManager: Removing block broadcast_1
15/05/18 17:35:56 INFO MemoryStore: Block broadcast_1 of size 3448 dropped from memory (free 69909066)
15/05/18 17:35:56 INFO ContextCleaner: Cleaned broadcast 1
15/05/18 17:35:56 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:56 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:56 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:57 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:57 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:58 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -595
15/05/18 17:35:58 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 2
15/05/18 17:35:58 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 1800 bytes result sent to driver
15/05/18 17:35:58 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:58 INFO Executor: Running task 2.0 in stage 1.0 (TID 3)
15/05/18 17:35:58 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 2655 ms on localhost (2/596)
15/05/18 17:35:58 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:3+1
15/05/18 17:35:58 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:58 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:58 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:59 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
.
.
.
.

The code behaves as if it's running inside an infinite loop(without doing anything useful). Tried Googling it, but did not find anything useful there. Could not get any help through Spark mailing list either. The same code is running perfectly fine if I run it against a file stored locally. I am not sure if I am missing something as this is how the API tells me to go. Could anyone of you point me towards the right direction?

Would really appreciate some help. Thank you so much for your precious time.


Solution

  • Apparently s3a doesn't work with this. This is what I had to do to make it work :

            SparkConf conf = new SparkConf().setAppName("S3DataFrame").setMaster(
                    "local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            Configuration config = sc.hadoopConfiguration();
    
            config.set("fs.s3n.impl",
                    "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
            config.set("fs.s3n.awsAccessKeyId", "************");
            config.set("fs.s3n.awsSecretAccessKey",
                    "*************");
            SQLContext sqlContext = new SQLContext(sc);
            DataFrame df = sqlContext
                    .load("s3n://input/avro_data/part-00000.avro",
                            "com.databricks.spark.avro");