Search code examples
apache-sparkcassandrarddspark-cassandra-connector

Spark RDD map 1 to many


I'm new to spark and I have a problem. I'm processing an RDD generated with textFile() which is a csv file. For each line I want to return multiple lines to a new RDD (a single one not multiple ones). This is my code:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
            new Function<String, Boolean>() {
                public Boolean call(String line) {
                    return line.contains("LinearAccelerationEvent");
                }
            }).map(
            new Function<String, LinearAccelerationEvent>() {
                public LinearAccelerationEvent call(String line) throws Exception {
                    String[] fields = line.split(",");
                    LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                    return linearAccelerationEvent;
                }
            }).cache();

What I'm doing here is to filter the initial csv to get only the LinearAccelerationEvent, then I want to map these objects to LinearAccelerationEvent class and generate a new RDD of LinearAccelerationEvent objects. For each line of the initial csv file I have to generate multiple LinearAccelerometerEvent objects but I do not know how to do it. The reason why I want to do it is that later this RDD will be pushed to cassandra like this:

javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();

So the ideal solution will be something like:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
                new Function<String, Boolean>() {
                    public Boolean call(String line) {
                        return line.contains("LinearAccelerationEvent");
                    }
                }).map(
                new Function<String, LinearAccelerationEvent>() {
                    public LinearAccelerationEvent call(String line) throws Exception {
                        String[] fields = line.split(",");
                        for() {
                           LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                           return linearAccelerationEvent;
                        }
                }
            }).cache();

I can use the foreachPartition() function and push each event of the for loop to Cassandra but I saw this approach is much slower. Is it possible to not user foreach to do what I want to do? Thank you


Solution

  • If I'm understanding you correctly, return a collection (e.g. List) of LinearAccelerationEvent and call flatMap instead of map. This will produce a value in the resulting RDD for each acceleration event.

    flatMap is the same as calling map followed by flatten. If you're familiar with Hive it's similar to using the explode DTF available in HiveQL.