Search code examples
scalaapache-sparkspark-streaming

Loop and process multiple HDFS files in spark/scala


I have multiple files in my HDFS folder and I want to loop and run my scala transformation logic on it.

I am using below script which is working fine in my development environment using local files but it is failing when I run on my HDFS environment. Any idea where am I doing wrong please?

val files = new File("hdfs://172.X.X.X:8020/landing/").listFiles.map(_.getName).toList

files.foreach { file =>
print(file) 
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + file)
event.show(false)
}

Can someone correct it or suggest alternative solution please.


Solution

  • You should use Hadoop IO library to handle hadoop files.

    code:

    import java.net.URI
    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.spark.sql.SparkSession
    
    val spark=SparkSession.builder().master("local[*]").getOrCreate()
    
    val fs=FileSystem.get(new URI("hdfs://172.X.X.X:8020/"),spark.sparkContext.hadoopConfiguration)
    
    fs.globStatus(new Path("/landing/*")).toList.foreach{
       f=>
       val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + f.getPath.getName)
       event.show(false)
    }