Search code examples
javaapache-sparkguavamultimap

Spark - Can a MultiMap be converted to a DataFrame in JAVA


I'm trying to convert a MultiMap of billions of data values to a Spark DataFrame to run calculations on then write the results to a cassandra table.

I generate the multimap from the following cassandra query and loop. I'd be happy to take suggestions if there would be a better way to get and manipulate this data into a DataFrame like I am with the loop.

Code Updated With Answer:

//Build ResultSet from cassandra query for data manipulation.
        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
        //Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
        stmt.setFetchSize(1000);
        ResultSet results = session.execute(stmt);

// Get the Variables from each Row of Cassandra Data        
 Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){       
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.            
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for      
        } // end "row" for

// Create Spark List for DataFrame        
        List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());

// Create DataFrame and Calculate Results
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();

    } // end session
} // End Compute 

public class Value implements Serializable {
    public Value(Double channel, Float power) {
        this.channel = channel;
        this.power = power;
    }
    Double channel;
    Float power;

    public void setChannel(Double channel) {
        this.channel = channel;
    }
    public void setPower(Float power) {
        this.power = power;
    }
    public Double getChannel() {
        return channel;
    }
    public Float getPower() {
        return power;
    }

    @Override
    public String toString() {
        return "[" +channel +","+power+"]";
    }
}

The sample multimap has the types {Double}=[Float] where there may be multiple Float items for each Double

EXAMPLE

{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]

I need to use spark to get the min, max, average of each of these. For example for the first one 1.50ED would be min 10, max 20, avg 15.

I already have the code that I can use once I can get it in a temptable and operated on as a dataframe:

queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)      
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();

I would be grateful for some tips on how to convert the multimap into a DataFrame using JAVA. I haven't been able to find any documenation about using multimaps with spark.

Im currently using a solution that does the initial query and with the for loop writes the raw data to a new table that I can in turn directly map to a temptable / dataframe but that takes to much time since I have to write billions of rows to cassandra before calculating. I'd like to use a multimap or something similar and convert directly to spark for calculation.


Solution

  • Alas the Java parallelize method takes either a list of T or for parallelizePairs a list of Tuple<K, V>. So you will need to convert. While the createDataFrame only works of RDDs and Scala Seq and needs a schema (either a bean or a StructType).

    To make it Even More Fun com.google.common.collect.ImmutableEntry is not serializable, so you need to convert in Java, so a Java-ficated version of @Pankaj Arora solution would not work unless you moved the conversion logic into Java. I.e.

    public class Value implements Serializable {
        public Value(Double a, Float b) {
            this.a = a;
            this.b = b;
        }
        Double a;
        Float b;
    
        public void setA(Double a) {
            this.a = a;
        }
        public void setB(Float b) {
            this.b = b;
        }
        public Double getA() {
            return a;
        }
        public Float getB() {
            return b;
        }
    
        public String toString() {
            return "[" +a +","+b+"]";
        }
    }
    
    
        Multimap<Double, Float> data = LinkedListMultimap.create();
        data.put(1d, 1f);
        data.put(1d, 2f);
        data.put(2d, 3f);
    
        List<Value> values = data.asMap().entrySet()
                .stream()
                .flatMap(x -> x.getValue()
                        .stream()
                        .map(y -> new Value(x.getKey(), y)))
                .collect(Collectors.toList());
    
        sqlContext.createDataFrame(sc.parallelize(values), Value.class).show();
    

    Given your edit I'd look at creating objects (rather than a multimap) from the off.