Search code examples
rapache-sparksparkr

SparkR DataFrame partitioning issue


In my R script, I have a SparkDataFrame of two columns (time, value) containing data for four different months. Because of the fact that I need to apply my function to an each month separately, I figured I would repartition it into four partitions where each of them would hold data for a separate month.

I created an additional column called partition, having an integer value 0 - 3 and after that called the repartition method by this specific column.

Sadly as it's being described in this topic: Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?, with the repartition method we are only sure that all the data with the same key will end up in the same partition, however data with a different key can also end up in the same partition.

In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.

I guess I should be using the partitionBy method, however in case of SparkR I have no idea how to do that. The official documentation states that this method is applied to something called WindowSpec and not a DataFrame.

I would really appreciate some help with this matter as I have no idea how to incorporate this method into my code.

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
  schema)

Solution

  • You are using wrong method. If you

    need to apply my function to an each month separately

    you should use gapply that

    Groups the SparkDataFrame using the specified columns and applies the R function to each group.

    df %>% group_by("month") %>% gapply(fun, schema)
    

    or

    df %>% gapply("month", fun, schema)
    

    In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.

    This suggests hash collisions. Increasing number of partitions reasonably above the number of unique keys should resolve the problem:

    spark.sql.shuffle.partitions 17
    

    I guess i should be using the partitionBy method, however

    No. partitionBy is used with window functions (SparkR window function).


    To address your comment:

    i decided to use dapply with separate partitions in order to be able to easily save each month into separate CSV file

    Hash partitioner doesn't work like that How does HashPartitioner work?

    You can try with partitionBy in the writer, but I am not sure if it directly supported in SparkR. It is supported in structured streaming, for batch you may have to call Java methods or use tables with metastore:

    createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
    sql(
        "CREATE TABLE iris 
        USING csv PARTITIONED BY(species)
        LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
    )