Search code examples
scalaapache-sparkapache-spark-sqlrdd

Read data from multiple files into a single RDD or Dataframe


I am having HMP dataset. This data set has 14 different folders (categories) and each category has multiple CSV files in it.

I want to read data from all csv files into a single dataframe. Schema for data is

 val Tschema = StructType(Array(
  StructField("X", IntegerType, true),
  StructField("Y", IntegerType, nullable = true),
  StructField("Z", IntegerType, true)
 ))

I addition i want to add Two more columns to dataframe. First column contains the name of folder(category) containing current CSV and second column contains name of CSV file.

I have tried following code but it did not work properly.

val path = System.getProperty("user.home") + "/Desktop/HMP/*"  // Path to all categories
val df =spark.sparkContext.wholeTextFiles(path)
df.toDF().show(5 , false)

output of my code is

+----------------------------------------------------------------------+--------------------+
|                                                                    _1|                  _2|
+----------------------------------------------------------------------+--------------------+
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |12 38 35            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |23 56 34            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |13 36 36            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |39 57 42            |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt    |26 51 36            |
+----------------------------------------------------------------------+--------------------+

Here in first column (_1) before \ is part that i want to be on separate column class and remain part will be in column source. On the _2 part i want to apply schema that I defined.

I want final output to will look like following.

+---+---+---+--------------+---------------------+
|  X|  Y|  Z|         class|               source|
+---+---+---+--------------+---------------------+
| 37| 34| 43|  Climb_stairs|Accelerometer-2011...|
| 05| 39| 34|  Climb_stairs|Accelerometer-2011...|
| 30| 53| 49|  Climb_stairs|Accelerometer-2011...|
+---+---+---+-------------+----------------------+ 

Solution

  • I think you're looking at files from local file system. can you include details of what do you get in df? Are you running spark in local mode?

    If you want to try on Cloudera VM, you can do something like, put two of those csv files into hdfs location by following below steps

    hdfs dfs -mkdir /files
    hdfs dfs -put sample.csv sample2.csv /files/
    

    Run spark as

    spark2-shell
    val df = spark.read.csv("/files/")
    df.show
    

    For reading file name and directory, you may need to play with split and input_file_name functions depending on exact location of files on HDFS.

    You could add something as below.

    val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType))
    

    Similarly you can play with input_file_name and probably substr to grab input directory depending on what part you want.