Search code examples
hadoopapache-sparkhivedplyrsparklyr

What is the most efficient way to create new Spark Tables or Data Frames in Sparklyr?


Using the sparklyr package on a Hadoop cluster (not a VM), I'm working with several types of tables that need to be joined, filtered, etc... and I'm trying to determine what would be the most efficient way to use the dplyr commands along with the data management functions in sparklyr to run processing, store it in cache, and work with the intermediate data objects to produce downstream objects that remain in cache. This question is superficial as it's posed above, but I'm hoping to get a little more information than pure efficiency, so if you'd like to edit my question, I'm ok with that...

I have some tables in Hive, let's call them Activity2016,Accounts2016, and Accounts2017. The "Accounts" tables also include address history. I want to start with 2016 data merging the two tables on name and present address, filter on some activity and account detail, then merge two different ways with 2017 account information, specifically to tally the number of people who remained at their address vs. those who changed address. We've got millions of rows, so we're doing this activity with our spark cluster.

So, to begin, this is what I'm doing now:

sc <- spark_connect()    

Activity2016 %>% filter(COL1 < Cut1 & COL1 > Cut2) %>% 
select(NAME,ADDRESS1) %>% 
inner_join(Accounts2016,c("NAME"="NAME","ADDRESS1"="ADDRESS1")) %>%
distinct(NAME,ADDRESS1) %>% sdf_register("JOIN2016")

tbl_cache(sc,"JOIN2016")
JOINED_2016 <- tbl(sc, "JOIN2016")

Acct2017 = tbl(sc, "HiveDB.Accounts2017")

# Now, I can run:
JOINED_2016 %>% inner_join(Acct2017,c("NAME"="NAME","ADDRESS1"="ADDRESS2")) %>%
distinct(NAME,ADDRESS1.x) %>% sdf_register("JOIN2017")

# Rinse & Repeat
tbl_cache(sc,"JOIN2017")
JOINED_2017 <- tbl(sc,"JOIN2017")

Then I continue working with JOINED_2016 and JOINED_2017, using dplyr verbs, etc...

There seems to be multiple inefficiencies here... like, 1) Shouldn't I be able to send it straight to cache and call it as a variable? 2) Shouldn't I also be able to send it straight into a written Hive table? 3) How can I cast the final object to run basic R commands like table(JOINED_2016$COL1) or are those unavailable (I'm getting errors when trying %>% select(COL1) %>% table)?

I'll lose the data if there's a downstream error and I don't write it... but I feel like there's too many choices about how to write the data that I'm not clear on. When does it end up as a cached object, versus an RDD, versus a Hive internal/external table, versus a Spark DataFrame and what constraints does each have with R's ability to work with those data objects?

For example, what if I just run:

JOIN2016 <- Activity2016 %>% filter(COL1 < Cut1 & COL1 > Cut2) %>% 
select(NAME,ADDRESS1) %>% 
inner_join(Accounts2016,c("NAME"="NAME","ADDRESS1"="ADDRESS1")) %>%
distinct(NAME,ADDRESS1) 

Would this be an R data.frame? (that would potentially crash the RAM of my gateway node... which is why I'm reluctant to try it. This is a cluster at the business)

So summarizing: Should I bother with the tbl and tbl_cache commands at all or do I need them?

Should I use dbWriteTable and can I do it directly after, before, or in lieu of sdf_register... or do I need to use the tbl command before I can write anything to Hive? sdf_register almost seems pointless.

Should I use copy_to or db_copy_to instead of dbWriteTable? I don't want to turn Hive into a dumping ground, so I want to be careful about how I write the intermediate data and then be consistent about how R will work with it after I store it.

Which of these data.frame-type do I have to run to work with the data like it's an in-memory R object, or am I restricted to dplyr commands?

Sorry there's so much in this question, but I don't feel like there's clarity around these issues in the R-bloggers articles, the sparklyr tutorials, nor the other questions on SOF.


Solution

  • sdf_register is not very useful when dealing with long running queries. It is basically an unmaterialized view which means it runs the underlying query each time you call it. Adding the following will write the data to Hive as a table.

    spark_dataframe %>% invoke("write") %>% invoke("saveAsTable", as.character("your_desired_table_name"))

    This uses saveAsTable as table which will create a table in have and keep that table even after the Spark session ends. Using createOrReplaceTempView does not persist the data when the Spark session ends.