Search code examples
javaapache-sparkrdd

Java Spark - Issue in filtering records in RDD based on number of columns


This is a different issue, i am trying to filter the records in RDD based on the number of columns. This is more like a File processing.

I have written the same in Pyspark and i see the records are filtering properly. When i am trying in Java the Valid Records are going to Error Folder.

Downloaded the Error Files and validated using AWK to find they are having 996 column, but still getting filtered out in the Error.

In python the exact number of Files filtered are error files.

Below is the piece of Snippet.


JavaRDD<String> inputDataRDD = sc.textFile(args[0]+"/"+args[1], 5000);

int columnLength = Integer.parseInt(args[3]);

inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

inputDataRDD
.filter(filterData -> filterData.split("\t").length != columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Error/", GzipCodec.class);

End of Snippet..

There are close to 10M records in that file.

Is there any difference in the sc.textfile (filename , int numPartitions) between Java and Python or am i missing anything.

Need your assistance to find out the mistake i have made.

Note :-Did a maven build using eclipse and ran the following command in Yarn.

spark-submit --class com.virtualpairprogrammers.ProcessFilesToHDFS --master yarn learningSpark-0.0.1-SNAPSHOT.jar "/input/ABFeeds/" "ABFeeds_2020-04-20.tsv.gz" "/output/ABFeeds/2020-05-06/" 996

Thanks in Advance

Regards

Sam


Solution

  • The issue is with the Split command which i was using. Issue :- When the last column in empty , Java split is not able to consider it as a column. I had referred the following site, which talks about the Split issue

    Java String split removed empty values

    Old Snippet :

    inputDataRDD
        .filter(filterData -> filterData.split("\t").length == columnLength)
        .coalesce(1)
        .saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
    

    Modified Snippet :

    inputDataRDD
        .filter(filterData -> filterData.split("\t",-1).length == columnLength)
        .coalesce(1)
        .saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
    

    I have tested it and it is working.

    Thanks all for the assistance.

    Regards