I'm a beginner in Spark-Java
, I want to get a subList from a List in Java 8
. Then I convert it to an RDD. I did it in this code bellow:
List<CSVRecord> inputRecords = readInputLayer(actorSystem, inputCatalog, inputCatalogVersion);
LOGGER.info("Number of partition " +inputRecords.size());
List<CSVRecord> inputRecordsTmp = inputRecords.stream().limit(100).collect(Collectors.toList());
JavaRDD<CSVRecord> inputRecordsJavaRDD = JavaSparkContext.emptyRDD();
for (List<CSVRecord> partition: inputRecordsTmp ){
JavaRDD<CSVRecord> inputRecordsTmpRDD = (JavaRDD<CSVRecord>) JavaSparkContext.parallelize(partition);
inputRecordsJavaRDD = JavaSparkContext.union(inputRecordsJavaRDD,inputRecordsTmpRDD);
}
LOGGER.info("Number of lines to insert JAVA RDD =" +inputRecordsJavaRDD.count());
But I got an error in a loop for,
it do not accept List<CSVRecord> partition:
Incompatible types:
Required: org.apache.commons.csvRecord
Found: java.util.list <org.apache.commons.csvRecord>
How can I correct it ? Thank you
EDIT: Just had a look at the documentation. parallelize
in fact accepts a List, but this only means that you do not need a foreach at all. You should be able to just pass your inputRecordsTmp
right into parallelize and get your RDD out of it.
Still, since the foreach is formulated incorrectly, I'll leave this here for further information:
First of all, your for-each-loop does not look entierly correct to me. When writing a for-each-loop in Java, the part before the colon is supposed to be of the type of the element in the collection you're iterating over.
In your case, you have a List that contains CSVRecord objects. In a for-each you are basically saying "for each CSVRecord in this List of CSVRecord objects...". However, you have written "for each List of CSVRecord objects in this list of CSVRecord objects...", which doesn't make much sense.
Java already knows that inputRecordsTmp
is a List of CSVRecords, so no need to specify that again anywhere. What you instead want to tell it is, that for each object in the list, you want to extract that object into a variable (partition
in your case) so you can use that extracted object inside your loop.
So instead of writing
for (List<CSVRecord> partition: inputRecordsTmp ){
You would have to write
for (CSVRecord record: inputRecordsTmp ){
This might solve your problem. If not, could you please add information on what line exactly is failing?