Search code examples
rapache-sparkmachine-learningdata-sciencesparklyr

What's the best way to subset a spark dataframe (in sparklyr) based on the column data type


I'm converting a bunch of columns into dummy variables. I want to remove the original categorical variable from the dataframe. I'm struggling to figure out how to do it in sparklyr. It's straightforward in dplyr, but the dplyr functionality isn't working in sparklyr.

For example:

First create a spark dataframe:

    ###create dummy data to figure out how model matrix formulas work in sparklyr
v1 <- sample( LETTERS[1:4], 50000, replace=TRUE, prob=c(0.1, 0.2, 0.65, 0.05))
v2 <- sample( LETTERS[5:6], 50000, replace=TRUE, prob=c(0.7,0.3))
v3 <- sample( LETTERS[7:10], 50000, replace=TRUE, prob=c(0.3, 0.2, 0.4, 0.1))
v4 <- sample( LETTERS[11:15], 50000, replace=TRUE, prob=c(0.1, 0.1, 0.3, 0.05,.45))
v5 <- sample( LETTERS[16:17], 50000, replace=TRUE, prob=c(0.4,0.6))
v6 <- sample( LETTERS[18:21], 50000, replace=TRUE, prob=c(0.1, 0.1, 0.65, 0.15))
v7 <- sample( LETTERS[22:26], 50000, replace=TRUE, prob=c(0.1, 0.2, 0.65, 0.03,.02))
v8 <- rnorm(n=50000,mean=.5,sd=.1)
v9 <- rnorm(n=50000,mean=5,sd=3)
v10 <- rnorm(n=50000,mean=3,sd=.5)
response <- rnorm(n=50000,mean=10,sd=2)

dat <- data.frame(v1,v2,v3,v4,v5,v6,v7,v8,v9,v10,response)
write.csv(dat,file='fake_dat.csv',row.names = FALSE)

#push "fake_dat" to the hdfs

library(dplyr)
library(sparklyr)
#configure the spark session and connect
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "2G" #change depending on the size of the data
config$`sparklyr.shell.executor-memory` <- "2G"

# sc <-  spark_connect(master='local', spark_home='/usr/hdp/2.5.0.0-1245/spark',config = config)
# sc

sc <-  spark_connect(master='yarn-client', spark_home='/usr/hdp/2.5.0.0-1245/spark',config = config)
sc

#can also set spark_home as '/usr/hdp/current/spark-client'

#read in the data from the hdfs
df <- spark_read_csv(sc,name='fdat',path='hdfs://pnhadoop/user/stc004/fake_dat.csv')

#create spark table
dat <- tbl(sc,'fdat')

Now create dummy variables:

for(i in 1:7){
  dat <- ml_create_dummy_variables(x=dat,colnames(dat)[i], reference = NULL)
}

I could simply drop the original categorical variables using

drop.cols <- colnames(dat)[1:7]
dat1 <-
  dat %>%
  select(-one_of(drop.cols))

However, the data that I'm actually working with has 300 categorical variables. I need a quick way to identify which columns are character/factor. After converting those columns to dummy variables - I can then remove the original categorical variables. I've tried the following:

test <-
  dat %>%
  select_if(is.character)

I then get the following error:

Error: Selection with predicate currently require local sources

I've also tried:

cls <- sapply(dat, class)
cls

But I get:

> cls

         src         ops
    [1,] "src_spark" "op_base_remote"
    [2,] "src_sql"   "op_base"
    [3,] "src"       "op"

Any ideas on how to do this?


Solution

  • Calling this "the best" would be a stretch but you can try something like this (purr is used for convenience):

    columns_for_type <- function(sc, name, type="StringType") {
      spark_session(sc) %>% 
        invoke("table", name) %>% 
        # Get (name, type) tuples
        invoke("dtypes") %>%
        # Filter by type
        purrr::keep(function(x) invoke(x, "_2") == type) %>% 
        purrr::map(function(x) invoke(x, "_1"))
    }
    

    which can be used as follows:

    library(sparklyr)
    library(dplyr)
    
    sc <- spark_connect(master = "local[*]")
    iris_tbl <- copy_to(sc, iris, name="iris", overwrite=TRUE)
    
    columns_for_type(sc, "iris", "StringType")
    
    [[1]]
    [1] "Species"
    
    columns_for_type(sc, "iris", "DoubleType")
    
    [[1]]
    [1] "Sepal_Length"
    
    [[2]]
    [1] "Sepal_Width"
    
    [[3]]
    [1] "Petal_Length"
    
    [[4]]
    [1] "Petal_Width"
    

    The result can be passed to select_:

    iris_tbl %>% select_(.dots=columns_for_type(sc, "iris", "StringType"))
    
    Source:   query [150 x 1]
    Database: spark connection master=local[8] app=sparklyr local=TRUE
    
       Species
         <chr>
    1   setosa
    2   setosa
    3   setosa
    4   setosa
    5   setosa
    6   setosa
    7   setosa
    8   setosa
    9   setosa
    10  setosa
    # ... with 140 more rows
    

    You could use a similar approach by taking a single row as a data.frame:

    iris_tbl %>% head(n=1) %>% as.data.frame %>% lapply(class)
    

    but it requires an additional Spark action.