This is a different issue, i am trying to filter the records in RDD based on the number of columns. This is more like a File processing.
I have written the same in Pyspark and i see the records are filtering properly. When i am trying in Java the Valid Records are going to Error Folder.
Downloaded the Error Files and validated using AWK to find they are having 996 column, but still getting filtered out in the Error.
In python the exact number of Files filtered are error files.
Below is the piece of Snippet.
JavaRDD<String> inputDataRDD = sc.textFile(args[0]+"/"+args[1], 5000);
int columnLength = Integer.parseInt(args[3]);
inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
inputDataRDD
.filter(filterData -> filterData.split("\t").length != columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Error/", GzipCodec.class);
End of Snippet..
There are close to 10M records in that file.
Is there any difference in the sc.textfile (filename , int numPartitions) between Java and Python or am i missing anything.
Need your assistance to find out the mistake i have made.
Note :-Did a maven build using eclipse and ran the following command in Yarn.
spark-submit --class com.virtualpairprogrammers.ProcessFilesToHDFS --master yarn learningSpark-0.0.1-SNAPSHOT.jar "/input/ABFeeds/" "ABFeeds_2020-04-20.tsv.gz" "/output/ABFeeds/2020-05-06/" 996
Thanks in Advance
Regards
Sam
The issue is with the Split command which i was using. Issue :- When the last column in empty , Java split is not able to consider it as a column. I had referred the following site, which talks about the Split issue
Java String split removed empty values
Old Snippet :
inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
Modified Snippet :
inputDataRDD
.filter(filterData -> filterData.split("\t",-1).length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);
I have tested it and it is working.
Thanks all for the assistance.
Regards