Search code examples
rapache-sparksparklyr

How to repartition a data frame in sparklyr


This is proving difficult to find for some reason. I can easily find the repartition function in pyspark and in sparkr, but no such function seems to exist in sparklyr.

Does anyone know how to repartition a Spark dataframe in sparklyr?


Solution

  • You can try something like this

    library(dplyr)
    library(stringi)
    
    
    #' @param df tbl_spark
    #' @param numPartitions numeric number of partitions
    #' @param ... character column names
    repartition <- function(df, numPartitions, ...) {
      # Create output name
      alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")
    
      # Convert to Spark DataFrame
      sdf <- df %>% spark_dataframe
    
      # Convert names to Columns
      exprs <- lapply(
        list(...),
        function(x) invoke(sdf, "apply", x)
      )
    
      sdf %>% 
        invoke("repartition", as.integer(numPartitions), exprs) %>%
        # Use "registerTempTable" with Spark 1.x
        invoke("createOrReplaceTempView", alias)
    
      tbl(sc, alias)
    }
    

    Example usage:

    df <- copy_to(sc, iris)
    
    repartition(df, 3, "Species") %>% optimizedPlan
    
    ## <jobj[182]>
    ##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
    ##   RepartitionByExpression [Species#775], 3
    ## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
    ##    :  +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
    
    repartition(df, 7) %>% optimizedPlan
    ## <jobj[69]>
    ##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
    ##   RepartitionByExpression 7
    ## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
    ##    :  +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
    

    Function optimizedPlan as defined in Sparklyr: how to center a Spark table based on column?