I'm getting familiar with Spark and Scala and my current task is to "sum" these two Dataframes:
+---+--------+-------------------+
|cyl|avg(mpg)| var_samp(mpg)|
+---+--------+-------------------+
| 8| 15.8| 1.0200000000000014|
| 6| 20.9|0.48999999999999966|
| 4| 33.9| 0.0|
+---+--------+-------------------+
+---+------------------+------------------+
|cyl| avg(mpg)| var_samp(mpg)|
+---+------------------+------------------+
| 8| 13.75| 6.746999999999998|
| 6| 21.4| NaN|
+---+------------------+------------------+
In this case the "key" is cyl
and the "values" avg(mpg)
and var_samp(mpg)
.
The (approximate) result for these two would be:
+---+--------+-------------------+
|cyl|avg(mpg)| var_samp(mpg)|
+---+--------+-------------------+
| 8| 29.55| 7.76712|
| 6| 42.3|0.48999999999999966|
| 4| 33.9| 0.0|
+---+--------+-------------------+
Notice how NaN
is considered to be zero and also how there might be "keys" missing in some DataFrames, (4 key is missing in the second).
I suspect reduceByKey
to be the way to go here but can't make it work.
Here is my code so far:
case class Cars(car: String, mpg: String, cyl: String, disp: String, hp: String,
drat: String, wt: String, qsec: String, vs: String, am: String, gear: String, carb: String)
object Bootstrapping extends App {
override def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark and SparkSql").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// Exploring SparkSQL
// Initialize an SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
// Load a cvs file
val csv = sc.textFile("mtcars.csv")
// Create a Spark DataFrame
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val mtcdata = headerAndRows.filter(_(0) != header(0))
val mtcars = mtcdata
.map(p => Cars(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11)))
.toDF
// Aggregate data after grouping by columns
import org.apache.spark.sql.functions._
mtcars.sort($"cyl").show()
mtcars.groupBy("cyl").agg(avg("mpg"), var_samp("mpg")).sort($"cyl").show()
//sample 25% of the population without replacement
val sampledData = mtcars.sample(false, 0.25)
//bootstrapping loop
for (a <- 1 to 5) {
//get bootstrap sample
val bootstrapSample = sampledData.sample(true, 1)
//HERE!!! I WANT TO SAVE THE AGGREGATED SUM OF THE FOLLOWING:
bootstrapSample.groupBy("cyl").agg(avg("mpg"), var_samp("mpg"))
}
}
}
This is the data I'm using: Motor Trend Car Road Tests
One approach would be to union
the two DataFrames, use when/otherwise
to translate NaN
, and perform groupBy
to aggregate the sum
s of the columns, as shown below:
import org.apache.spark.sql.functions._
import spark.implicits._
val df1 = Seq(
(8, 15.8, 1.0200000000000014),
(6, 20.9, 0.48999999999999966),
(4, 33.9, 0.0)
).toDF("cyl", "avg_mpg", "var_samp_mpg")
val df2 = Seq(
(8, 13.75, 6.746999999999998),
(6, 21.4, Double.NaN)
).toDF("cyl", "avg_mpg", "var_samp_mpg")
(df1 union df2).
withColumn("var_samp_mpg", when($"var_samp_mpg".isNaN, 0.0).otherwise($"var_samp_mpg")).
groupBy("cyl").agg(sum("avg_mpg"), sum("var_samp_mpg")).
show
// +---+------------+-------------------+
// |cyl|sum(avg_mpg)| sum(var_samp_mpg)|
// +---+------------+-------------------+
// | 6| 42.3|0.48999999999999966|
// | 4| 33.9| 0.0|
// | 8| 29.55| 7.7669999999999995|
// +---+------------+-------------------+