Let's say I want to execute something as follows:
library(SparkR)
...
df = spark.read.parquet(<some_address>)
df.gapply(
df,
df$column1,
function(key, x) {
return(data.frame(x, newcol1=f1(x), newcol2=f2(x))
}
)
where the return of the function has multiple rows. To be clear, the examples in the documentation (which sadly echoes much of the Spark documentation where the examples are trivially simple) don't help me identify whether this will be handled as I expect.
I would expect that the outcome of this would be, for k groups created in the DataFrame with n_k output rows per group, that the result of the gapply() call would have sum(1..k, n_k) rows, where the key value is replicated for each of n_k rows for each group in key k ... However, the schema-field suggests to me that this is not how this will be handled - in fact it suggests that it will either want the result pushed into a single row.
Hopefully this is clear, albeit theoretical (I'm sorry I can't share my actual code example). Can someone verify or explain how such a function will actually be treated?
Exact expectations regarding input and output are clearly stated in the official documentation:
Apply a function to each group of a
SparkDataFrame
. The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and Rdata.frame
corresponding to that key. The groups are chosen fromSparkDataFrames
column(s). The output of function should be adata.frame
.Schema specifies the row format of the resulting
SparkDataFrame
. It must represent R function’s output schema on the basis of Spark data types. The column names of the returneddata.frame
are set by user. Below is the data type mapping between R and Spark.
In other words your function should take a key
and data.frame
of rows corresponding to that key and return data.frame
that can be represented using Spark SQL types with schema provided as schema
argument. There are no restriction regarding number of rows. You could for example apply identity transformation as follows:
df <- as.DataFrame(iris)
gapply(df, "Species", function(k, x) x, schema(df))
the same way as aggregations:
gapply(df, "Species",
function(k, x) {
dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width))
},
structType(
structField("species", "string"),
structField("max_s_width", "double"))
)
although in practice you should prefer aggregations directly on DataFrame
(groupBy %>% agg
).