Search code examples
scalaapache-sparkapache-spark-sqlrdf

spark scala each datasets output as a single row of dataframe


I have multiple .nt (NTriples) files in a directory. I want to read each dataset and store it's respective output values in a single row of dataframe.

Let say I have dataset1.nt, dataset2.nt,...,datasetn.nt. When reading each dataset using following code:

val input = "src/main/resources/dataset1.nt"
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input))
//NTripleReader reads .nt file and separates each line of dataset into subject, predicate and object     
/* My code to output number of distinct subjects, predicates and blank subjects in a dataset */

Let's say dataset1 is giving following output:

  • Number of distinct subjects: xxxx
  • Number of distinct predicates: yy
  • Number of blank subjects: zzz

Let's say dataset2 is giving following output:

  • Number of distinct subjects: aaaaa
  • Number of distinct predicates: b
  • Number of blank subjects: cc

and so on...

When I am using the following code to read all files in my directory:

val input = "src/main/resources/*"
val triplesRDD = NTripleReader.load(spark, JavaURI.create(input))

It is giving me following output:

  • Number of distinct subjects: xxxx+aaaaa+... // adding all individual values of each dataset
  • Number of distinct predicates: yy+b+...
  • Number of blank subjects: zzz+cc+...

However, I want my output to be like this:

Distinct Subjects | Distinct Predicates | Blank Subjects
xxxx              | yy                  | zzz
aaaaa             | b                   | cc    
...               | ...                 | ...

Please let me know how can I achieve my desired output.

Thanks in advance.


Solution

  • I am answering to my question. I hope this might be helpful for others

    import java.io.File
    //import other necessary packages
    
    
    object abc {
      var df1: DataFrame = _
      var df2: DataFrame = _         
      var df3: DataFrame = _
    
      def main(args: Array[String]):Unit = 
      {
        //initializing the spark session locally
        val spark = SparkSession.builder
              .master("local[*]")
              .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
              .appName("abc")
              .getOrCreate()
    
    //    creates a list of all files in a directory:
        def getListOfFiles(dir: String):List[File] = 
        {
          val path = new File("path/to/directory/")
          if (path.exists && path.isDirectory) 
          {
            path.listFiles.filter(_.isFile).toList
          } 
          else 
          {
            List[File]()
          }
        }
    
          val files = getListOfFiles("path/to/directory/")
          val input = ""
          for (input <- files)
          {  
          //  println(input)
            val triplesRDD = NTripleReader.load(spark, JavaURI.create(input.toString()))
    
            /*code to generate dataframe columns value*/
    
            import spark.implicits._
    
            if(input == files(0))
            {
                df3 = Seq(
                (column1_value, column2_value, column3_value, column4_value, column5_value, column6_value)
                ).toDF("column1_name", "column2_name", "column3_name", "column4_name", "column5_name", "column6_name")
            } 
            else
            {    
                df1 = Seq(
                (column1_value, column2_value, column3_value, column4_value, column5_value, column6_value)
                ).toDF("column1_name", "column2_name", "column3_name", "column4_name", "column5_name", "column6_name")  
                df2 = df3.union(df1)
                df3 = df2
            }
          }
          df3.show()
    // import dataframe to .csv file
              df3.coalesce(1).write
              .option("header", "true")
              .csv("path/to/directory/sample.csv")
              spark.stop
          }
        }