Search code examples
javalistapache-sparkrdd

Incompatible types: List CSVRecords java


I'm a beginner in Spark-Java, I want to get a subList from a List in Java 8. Then I convert it to an RDD. I did it in this code bellow:

            List<CSVRecord> inputRecords = readInputLayer(actorSystem, inputCatalog, inputCatalogVersion);
            LOGGER.info("Number of partition " +inputRecords.size());

            List<CSVRecord> inputRecordsTmp = inputRecords.stream().limit(100).collect(Collectors.toList());


            JavaRDD<CSVRecord> inputRecordsJavaRDD = JavaSparkContext.emptyRDD();
            for (List<CSVRecord> partition: inputRecordsTmp ){
                JavaRDD<CSVRecord> inputRecordsTmpRDD = (JavaRDD<CSVRecord>) JavaSparkContext.parallelize(partition);
                inputRecordsJavaRDD = JavaSparkContext.union(inputRecordsJavaRDD,inputRecordsTmpRDD);

            }

        LOGGER.info("Number of lines to insert JAVA RDD =" +inputRecordsJavaRDD.count());

But I got an error in a loop for, it do not accept List<CSVRecord> partition:

Incompatible types:
Required: org.apache.commons.csvRecord
Found: java.util.list  <org.apache.commons.csvRecord>

How can I correct it ? Thank you


Solution

  • EDIT: Just had a look at the documentation. parallelize in fact accepts a List, but this only means that you do not need a foreach at all. You should be able to just pass your inputRecordsTmp right into parallelize and get your RDD out of it.

    Still, since the foreach is formulated incorrectly, I'll leave this here for further information:

    First of all, your for-each-loop does not look entierly correct to me. When writing a for-each-loop in Java, the part before the colon is supposed to be of the type of the element in the collection you're iterating over.

    In your case, you have a List that contains CSVRecord objects. In a for-each you are basically saying "for each CSVRecord in this List of CSVRecord objects...". However, you have written "for each List of CSVRecord objects in this list of CSVRecord objects...", which doesn't make much sense.

    Java already knows that inputRecordsTmp is a List of CSVRecords, so no need to specify that again anywhere. What you instead want to tell it is, that for each object in the list, you want to extract that object into a variable (partition in your case) so you can use that extracted object inside your loop.

    So instead of writing

    for (List<CSVRecord> partition: inputRecordsTmp ){
    

    You would have to write

    for (CSVRecord record: inputRecordsTmp ){
    

    This might solve your problem. If not, could you please add information on what line exactly is failing?