Search code examples
javaapache-sparkapache-spark-sqlemr

AWS EMR Spark - get CSV And use with SparkSql api


//download file  csv
ByteArrayOutputStream downloadedFile = downloadFile();

//save file in temp folder csv   (
java.io.File tmpCsvFile = save(downloadedFile);

//reading
Dataset<Row> ds = session
        .read()
        .option("header", "true") 
        .csv(tmpCsvFile.getAbsolutePath())

tmpCsvFile saved in the following path:

/mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp/1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029.csv

Exception on reading:

org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://ip-33-33-33-33.ec2.internal:8020/mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp/1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029.csv;

I think the problem, is that the file is saved locally and when i try to read through spark-sql api it can't find the file. I already tried with sparkContext.addFile() and doesn't work.

Any solutions?

Thanks


Solution

  • Spark support large number of filesystem, for reading and writing.

    • Local/Regular (file://)
    • S3 (s3://)
    • HDFS (hdfs://)

    As a standard behaviour if no URI is specified spark-sql will use hdfs://driver_address:port/path.

    The solution of adding file:/// to the path, can work only in the client mode , in my case (cluster) it doesn't. When the driver creates the task for reading the file it'll be passed to an executor to one of the node that doesn't have the file.

    What we can do? write a file on Hadoop.

       Configuration conf = new Configuration();
       ByteArrayOutputStream downloadedFile = downloadFile();
       //convert outputstream in inputstream
       InputStream is=Functions.FROM_BAOS_TO_IS.apply(fileOutputStream);
       String myfile="miofile.csv";
       //acquiring the filesystem
       FileSystem fs = FileSystem.get(URI.create(dest),conf);
       //openoutputstream to hadoop
       OutputStream outf = fs.create( new Path(dest));
       //write file 
       IOUtils.copyBytes(tmpIS, outf, 4096, true);
       //commit the read task
       Dataset<Row> ds = session
        .read()
        .option("header", "true") 
        .csv(myfile)
    

    Thanks, any better solution is welcome