New to Spark (2.4.x) and using the Java API (not Scala!!!)
I have a Dataset
that I've read in from a CSV file. It has a schema (named columns) like so:
id (integer) | name (string) | color (string) | price (double) | enabled (boolean)
An example row:
23 | "hotmeatballsoup" | "blue" | 3.95 | true
There are many (tens of thousands) rows in the dataset. I would like to write an expression using the proper Java/Spark API, that scrolls through each row and applies the following two operations on each row:
null
, default it to 0.00
; and then2.55
to the priceSince I'm so new to Spark I'm not sure even where to begin! My best attempt thus far is definitely wrong, but its a least a starting point I guess:
Dataset csvData = sparkSession.read()
.format("csv")
.load(fileToLoad.getAbsolutePath());
// ??? get rows somehow
Seq<Seq<String>> csvRows = csvData.getRows(???, ???);
// now how to loop through rows???
for (Seq<String> row : csvRows) {
// how apply two operations specified above???
if (row["price"] == null) {
row["price"] = 0.00;
}
if (row["color"].equals("red")) {
row["price"] = row["price"] + 2.55;
}
}
Can someone help nudge me in the right direction here?
You could use spark sql api to achieve it. Null values could also be replaced with values using .fill()
from DataFrameNaFunctions
. Otherwise you could convert Dataframe to Dataset and do these steps in .map
, but sql api is better and more efficient in this case.
+---+---------------+-----+-----+-------+
| id| name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95| true|
| 24| abc| red| 1.0| true|
| 24| abc| red| null| true|
+---+---------------+-----+-----+-------+
import sql functions before class declaration:
import static org.apache.spark.sql.functions.*;
sql api:
df.select(
col("id"), col("name"), col("color"),
when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
.when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
.otherwise(col("price")).as("price")
,col("enabled")
).show();
or using temp view and sql query:
df.createOrReplaceTempView("df");
spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();
output:
+---+---------------+-----+-----+-------+
| id| name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95| true|
| 24| abc| red| 3.55| true|
| 24| abc| red| 2.55| true|
+---+---------------+-----+-----+-------+