Search code examples
javaapache-spark-sqlapache-spark-dataset

How can I print the content of rows in a Dataset using Java and the Spark SQL?


I would like to do a simple Spark SQL code that reads a file called u.data, that contains the movie ratings, creates a Dataset of Rows, and then print the first rows of the Dataset.

I've had as premise read the file to a JavaRDD, and map the RDD according to a ratingsObject(the object has two parameters, movieID and rating). So I just want to print the first Rows in this Dataset.

I'm using Java language and Spark SQL.

public static void main(String[] args){
    App obj = new App();
    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();

    Map<Integer,String> movieNames = obj.loadMovieNames();
    JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();

    JavaRDD<MovieRatings> movies = lines.map(line -> {
        String[] parts = line.split(" ");
        MovieRatings ratingsObject = new MovieRatings();
        ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
        ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
        return ratingsObject;
    });

    Dataset<Row> movieDataset = spark.createDataFrame(movies, MovieRatings.class);

    Encoder<Integer> intEncoder = Encoders.INT();
    Dataset<Integer> HUE = movieDataset.map(
            new MapFunction<Row, Integer>(){

                private static final long serialVersionUID = -5982149277350252630L;

                @Override
                public Integer call(Row row) throws Exception{
                    return row.getInt(0);
                }
            }, intEncoder
    );

    HUE.show();


    //stop the session
    spark.stop();
}

I've tried a lot of possible solutions that I found, but all of them got the same error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, localhost, executor 1): java.lang.ArrayIndexOutOfBoundsException: 1
at com.ericsson.SparkMovieRatings.App.lambda$main$1e634467$1(App.java:63)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

And here is the sample of the u.data file:

196 242 3   881250949
186 302 3   891717742
22  377 1   878887116
244 51  2   880606923
166 346 1   886397596
298 474 4   884182806
115 265 2   881171488
253 465 5   891628467
305 451 3   886324817
6   86  3   883603013
62  257 2   879372434
286 1014    5   879781125
200 222 5   876042340
210 40  3   891035994
224 29  3   888104457
303 785 3   879485318
122 387 5   879270459
194 274 2   879539794

Where the first column represents de UserID, the second MovieID, the third the rating,and the last one is the timestamp.


Solution

  • As mentioned before your data are not space separated. I'll show you two possible solutions, the first one based on RDD and the second one based on spark sql which is, in general, the best solution in term of performance.

    1. RDD (you should use built in types to reduce the overhead):

      public class SparkDriver {
      
          public static void main (String args[]) {
                     // Create a configuration object and set the name of 
                     // the application
                        SparkConf conf = new SparkConf().setAppName("application_name");
      
                     // Create a spark Context object
                        JavaSparkContext context = new JavaSparkContext(conf);
      
                    // Create final rdd (suppose you have a text file)
                       JavaPairRDD<Integer,Integer> movieRatingRDD = 
                                  contextFile("u.data.txt")
                                  .mapToPair(line -> {(
                                      String[] tokens = line.split("\\s+");
                                      int movieID = Integer.parseInt(tokens[0]);
                                      int rating = Integer.parseInt(tokens[1]);
                                      return new Tuple2<Integer, Integer>(movieID, rating);});
      
                   // Keep in mind that take operation takes the first n elements
                   // and the order is the order of the file.
                      ArrayList<Tuple2<Integer, Integer> list = new ArrayList<>(movieRatingRDD.take(10));
      
                      System.out.println("MovieID\tRating");
      
                      for(tuple : list) {
                         System.out.println(tuple._1 + "\t" + tuple._2);
                      }
      
                      context.close();
                   }} 
      
    2. SQL

      public class SparkDriver {

      public static void main(String[] args) {
      
      // Create spark session
         SparkSession session = SparkSession.builder().appName("[Spark app sql version]").getOrCreate();
      
         Dataset<MovieRatings> personsDataframe = session.read()
                      .format("tct")
                      .option("header", false)
                      .option("inferSchema", true)
                      .option("delimiter", "\\s+")
                      .load("u.data.txt")
                      .map(row -> {
                         int movieID = row.getInteger(0);
                         int rating = row.getInteger(1);
                         return new MovieRatings(movieID, rating);
                       }).as(Encoders.bean(MovieRatings.class);
      
            // Stop session
               session.stop();
      
        }
      
        }