Search code examples
rdplyrdisk.frame

Do I have to use collect with disk frames?


This question is a follow-up from this thread

I'd like to perform three actions on a disk frame

  1. Count the distinct values of the field id grouped by two columns (key_a and key_b)
  2. Count the distinct values of the field id grouped by the first of two columns (key_a)
  3. Add a column with the distinct values for the first column / the distinct values across both columns

This is my code

      my_df <-
        data.frame(
          key_a = rep(letters, 384),
          key_b = rep(rev(letters), 384),
          id = sample(1:10^6, 9984)
        )
      
      my_df %>% 
        select(key_a, key_b, id) %>% 
        chunk_group_by(key_a, key_b) %>% 
        # stage one
        chunk_summarize(count = n_distinct(id)) %>% 
        collect %>% 
        group_by(key_a, key_b) %>% 
        # stage two
        mutate(count_summed = sum(count)) %>%
        group_by(key_a) %>% 
        mutate(count_all = sum(count)) %>% 
        ungroup() %>% 
        mutate(percent_of_total = count_summed / count_all)

My data is in the format of a disk frame, not a data frame, and it has 100M rows and 8 columns.

I'm following the two step instructions described in this documentation

I'm concerned that the collect will crash my machine since it brings everything to ram

Do I have to use collect in order to use dplyr group bys in disk frame?


Solution

  • You should always use srckeep to load only those columns you need into memory.

    my_df %>% 
            srckeep(c("key_a", "key_b", "id")) %>%
            # select(key_a, key_b, id) %>% # no need if you use srckeep
            chunk_group_by(key_a, key_b) %>% 
            # stage one
            chunk_summarize(count = n_distinct(id)) %>% 
            collect %>% 
            group_by(key_a, key_b) %>% 
            # stage two
            mutate(count_summed = sum(count)) %>%
            group_by(key_a) %>% 
            mutate(count_all = sum(count)) %>% 
            ungroup() %>% 
            mutate(percent_of_total = count_summed / count_all)
    

    collect will only bring the results of computing chunk_group_by and chunk_summarize into RAM. It shouldn't crash your machine.

    You must use collect just like other systems like Spark.

    But if you are computing n_distinct, that can be done in one-stage anyway

     my_df %>% 
            srckeep(c("key_a", "key_b", "id")) %>%
            #select(key_a, key_b, id) %>% 
            group_by(key_a, key_b) %>% 
            # stage one
            summarize(count = n_distinct(id)) %>% 
            collect
    

    If you really concerned about RAM usage, you can reduce the number of workers to 1

    setup_disk.frame(workers=1)
    my_df %>% 
            srckeep(c("key_a", "key_b", "id")) %>%
            #select(key_a, key_b, id) %>% 
            group_by(key_a, key_b) %>% 
            # stage one
            summarize(count = n_distinct(id)) %>% 
            collect
    
    setup_disk.frame()