Search code examples
javaapache-sparkcassandradatastax-java-driverspark-cassandra-connector

How to Replace JAVA loop with Direct Spark Cassandra Table Data Manipulation


I"m trying to make my code more efficient since i have to process billions of rows of data in cassandra. I currently use a JAVA loop within the Datastax Cassandra Spark Connector to pull out the data and put it into a format that I'm familiar with (multimap) in order to get spark to do the manipulation. I'd like to be able to replace this Multimap loop with a direct spark manipulation of the cassandra table to save time and make everything more efficient. I'd greatly appreciate any code suggestions to accomplish that. Here is my existing code:

        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
        stmt.setFetchSize(2000000);
        ResultSet results = session.execute(stmt);

// Get the Variables from each Row of Cassandra Data        
        Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){       
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets    
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for      
        } // end "row" for

// Create Spark List for DataFrame        
        List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());

// Create DataFrame and Calculate Results
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();

    } // end session
} // End Compute 

Solution

  • JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
    JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
    @Override 
    public Iterable<Value> call(MeasuredValue row) throws Exception { 
    double start_frequency = row.getStart_frequency(); 
    float power = row.getPower(); 
    double bandwidth = row.getBandwidth(); 
    
    // Define Variable 
    double channel,channel_end, increment;  
    
    // Initialize Variables 
    channel_end = 1.6159E8; 
    increment = 5000; 
    
    List<Value> list = new ArrayList<Value>(); 
    // Create Channel Power Buckets 
    for(channel = 1.6000E8; channel <= channel_end; ){ 
    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) { 
    list.add(new Value(channel, power)); 
    } // end if 
    channel+=increment; 
    } // end for 
    
    return list; 
    }    
        });
    
        sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();
    
    } // end session
    
    public static class MeasuredValue implements Serializable {
    
            public MeasuredValue() { }
    
            private double start_frequency;
            public double getStart_frequency() { return start_frequency; }
            public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
    
            private double bandwidth ;
            public double getBandwidth() { return bandwidth; }
            public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
    
            private float power;    
            public float getPower() { return power; }
            public void setPower(float power) { this.power = power; }
    
        }