Search code examples
ramazon-redshiftdatabrickssparkrsparklyr

Connecting Redshift using sparklyr or sparkR?


I am trying to understand how to connect R to redshift using spark , i can't connect using simple RPostgres as that dataset is huge and needs distributed computing ,

so far i am able to read and write CSVs from s3 into spark dataframe , can someone please show how to configure jars and other things so that i can connect SparklyR(spark_read_jdbc() ) or sparkR to redshift .

Also it would be helpful if you can show how to add jars to sparkContexts

Till now I have figured out that databricks has provided with some jars that are needed to access jdbc url to redshift db .


Solution

  • rm(list=ls())
    library(sparklyr)
    #library(SparkR)
    #detach('SparkR')
    Sys.setenv("SPARK_MEM" = "15G")
    config <- spark_config()
    config$`sparklyr.shell.driver-memory` <- "8G"
    config$`sparklyr.shell.executor-memory` <- "8G"
    config$`spark.yarn.executor.memoryOverhead` <- "6GB"
    config$`spark.dynamicAllocation.enabled`   <- "TRUE"
    config$`sparklyr.shell.driver-java-options`<-list("driver-class-path" ="/home/root/spark/spark-2.1.0-bin-hadoop2.7/jars/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar")
    spark_dir = "/tmp/spark_temp"
    config$`sparklyr.shell.driver-java-options` <-  paste0("-Djava.io.tmpdir=", spark_dir)
    sc <- spark_connect(master = "local[*]", config = config)
    #sc <- spark_connect(master = "local")
    
    ###invoke the spark context 
    ctx <- sparklyr::spark_context(sc)
    #Use below to set the java spark context ##"org.apache.spark.api.java.JavaSparkContext"
    ####
    jsc <- sparklyr::invoke_static( sc, "org.apache.spark.api.java.JavaSparkContext", "fromSparkContext",ctx )
    ##invoke the hadoop context 
    hconf <- jsc %>% sparklyr::invoke("hadoopConfiguration")
    #hconf %>%    invoke("set","fs.s3a.access.key","<your access key for s3 >")  
    
    hconf %>%    sparklyr::invoke("set","fs.s3a.access.key","<your access key for s3>")  
    hconf %>% sparklyr::invoke("set","fs.s3a.secret.key", "<your secret key for s3>")   
    hconf%>% sparklyr::invoke("set","fs.s3a.endpoint", "<your region of s3 bucket>") 
    
    hconf %>% sparklyr::invoke("set","com.amazonaws.services.s3.enableV4", "true") 
    hconf %>% sparklyr::invoke("set","spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    
    
    hconf %>% sparklyr::invoke("set","fs.s3a.impl.disable.cache", "true") 
    
    
    
    ?spark_read_csv
    
    ###reading from s3 buckets 
    spark_read_csv(sc=sc,name='sr',path="s3a://my-bucket/tmp/2district.csv",memory = TRUE)
    spark_read_csv(sc=sc,name='sr_disk3',path="s3a://my-bucket/tmp/changed/",memory = FALSE)
    ###reading from local drive 
    spark_read_csv(sc=sc,name='raw_data_loc_in3',path="/tmp/distance.csv",memory = TRUE)
    spark_read_csv(sc=sc,name='raw_data_loc_in5',path="/tmp/distance.csv",memory = TRUE)
    
    
    
    
    
    
    
    
    
    ####reading from redshift table 
    t<-sparklyr::spark_read_jdbc(sc, "connection",  options = list(
      url = "jdbc:redshift://<URL>:<Port>/<dbName>",
      user = "<user_name>",
      password = "<password>",
      dbtable='(Select * from sales limit 1000)',
      tempS3Dir = "s3a://my-bucket/migration"),memory = T,overwrite = T,repartition = 3)
    
    ####write rdd to csv in local
    sparklyr::spark_write_csv(t,path='sample.csv')
    ####write rdd to csv in local
    sparklyr::spark_write_csv(t,path='s3a://my-bucket/output/')