I'm trying to convert a MultiMap of billions of data values to a Spark DataFrame to run calculations on then write the results to a cassandra table.
I generate the multimap from the following cassandra query and loop. I'd be happy to take suggestions if there would be a better way to get and manipulate this data into a DataFrame like I am with the loop.
Code Updated With Answer:
//Build ResultSet from cassandra query for data manipulation.
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
//Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
stmt.setFetchSize(1000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
The sample multimap has the types {Double}=[Float] where there may be multiple Float items for each Double
EXAMPLE
{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]
I need to use spark to get the min, max, average of each of these. For example for the first one 1.50ED would be min 10, max 20, avg 15.
I already have the code that I can use once I can get it in a temptable and operated on as a dataframe:
queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
I would be grateful for some tips on how to convert the multimap into a DataFrame using JAVA. I haven't been able to find any documenation about using multimaps with spark.
Im currently using a solution that does the initial query and with the for loop writes the raw data to a new table that I can in turn directly map to a temptable / dataframe but that takes to much time since I have to write billions of rows to cassandra before calculating. I'd like to use a multimap or something similar and convert directly to spark for calculation.
Alas the Java parallelize
method takes either a list of T
or for parallelizePairs
a list of Tuple<K, V>
. So you will need to convert. While the createDataFrame
only works of RDDs and Scala Seq
and needs a schema (either a bean or a StructType).
To make it Even More Fun com.google.common.collect.ImmutableEntry
is not serializable, so you need to convert in Java, so a Java-ficated version of @Pankaj Arora solution would not work unless you moved the conversion logic into Java. I.e.
public class Value implements Serializable {
public Value(Double a, Float b) {
this.a = a;
this.b = b;
}
Double a;
Float b;
public void setA(Double a) {
this.a = a;
}
public void setB(Float b) {
this.b = b;
}
public Double getA() {
return a;
}
public Float getB() {
return b;
}
public String toString() {
return "[" +a +","+b+"]";
}
}
Multimap<Double, Float> data = LinkedListMultimap.create();
data.put(1d, 1f);
data.put(1d, 2f);
data.put(2d, 3f);
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
sqlContext.createDataFrame(sc.parallelize(values), Value.class).show();
Given your edit I'd look at creating objects (rather than a multimap) from the off.