Search code examples
javaapache-sparkrdd

Iterate through a Java RDD by row


I would like to iterate through an RDD of strings and "do something" to each string. The output should be double[][]. Here is an example with a for loop. I understand I need to use (I think) the foreach function for Java RDDs. However, I have no idea how to understand the syntax. Documentation is not particularly helpful. I do not have Java 8.

Here is an example of what I would like to do if I could use a regular for loop.

public class PCA {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("PCA Example");
        SparkContext sc = new SparkContext(conf);

        RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);

        // here is the "type" of code I would like to execute
        // 30 because I have 30 variables
        double[][] vals = new double[data.count()][30];

        double[] temp;
        for (int i = 0; i < data.count(); i++) {
            temp = splitStringtoDoubles(data[i]);
            vals[i] = temp;
        }
    }

    private static double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\\t");
        Double[] vals = new Double[splitVals.length];
        for (int i = 0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
    }

}

I understand that foreach seems to require a function that has a void return type. Not sure how to work with that. Here is what I have attempted so far (obviously the syntax is wrong):

    double[][] matrix = new double[data.count()][30];
    foreach(String s : data) {
        String[] splitvals = s.split("\\t");
        double[] vals = Double.parseDouble(splitvals);
        matrix[s] = vals; 
    }

Solution

  • As mattinbits said in the comments, you want a map instead of a foreach, since you want to return values. What a map does basically is to transform your data: for each row of your RDD you perform an operation and return one value for each row. What you need can be achieved like this:

    import org.apache.spark.api.java.function.Function;
    
    ...
    
    SparkConf conf = new SparkConf().setAppName("PCA Example");
    SparkContext sc = new SparkContext(conf);
    
    JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
    JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
        @Override
        public double[] call(String row) throws Exception {
            return splitStringtoDoubles(row);
        }
    
        private double[] splitStringtoDoubles(String s) {
            String[] splitVals = s.split("\\t");
            Double[] vals = new Double[splitVals.length];
            for(int i=0; i < splitVals.length; i++) {
                vals[i] = Double.parseDouble(splitVals[i]);
            }
            return vals;
        }
    });
    
    List<double[]> whatYouWant = whatYouWantRdd.collect();
    

    So that you know how Spark works, you perform actions or transformations on your RDD. For instance, here we are transforming our RDD using a map function. You need to create this function yourself, this time with an anonymous org.apache.spark.api.java.function.Function which forces you to override the method call, where you receive a row of your RDD and return a value.