Search code examples
rapache-sparkamazon-s3sparklyr

Sparklyr connection to S3 bucket throwing up error


I am trying to connect to S3 buckets from R sparklyr . I am able to read local files into spark context. However trying to connect with s3 seems to be issue, throws up a big dump of errors . Here is a list of code used.

Note: A single s3 bucket has multiple csv files that follow the same schema.

library( sparklyr )
library( tidyverse )

sparklyr :: spark_install ( version = "2.0.2" , hadoop_version = "2.7" ) 
sparklyr::spark_install( version = "2.0.2" , hadoop_version = "2.7" ) 
Sys.setenv ( AWS_ACCESS_KEY_ID = "xxxx" )
Sys.setenv ( AWS_SECRET_ACCESS_KEY = "xxxx" )
Sys.setenv ( AWS_DEFAULT_REGION = "ap-southeast-1" )

Spark_config <- sparklyr :: spark_config ()
sc <- sparklyr :: spark_connect ( master = "local" ,config = Spark_config)
files = "s3n://temp-sg/MVC"
temp<-spark_read_csv(sc,name = "MVC",path=files,infer_schema = TRUE)
spark_disconnect(sc)

Any help here is greatly appreciated.

Here is the Error dump with use of s3a://

Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a:
    at org.apache.hadoop.fs.Path.initialize(Path.java:206)
    at org.apache.hadoop.fs.Path.<init>(Path.java:172)
    at org.apache.hadoop.fs.Path.<init>(Path.java:94)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
    at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
    at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
    at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
    at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke$.invoke(invoke.scala:94)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
    at sparklyr.StreamHandler$.read(stream.scala:55)
    at sparklyr.BackendHandler.channelRead0(handler.scala:49)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3a:
    at java.net.URI$Parser.fail(Unknown Source)
    at java.net.URI$Parser.failExpecting(Unknown Source)
    at java.net.URI$Parser.parse(Unknown Source)
    at java.net.URI.<init>(Unknown Source)
    at org.apache.hadoop.fs.Path.initialize(Path.java:203)
    ... 58 more

Error dump with use of s3n://

Error: java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n:
        at org.apache.hadoop.fs.Path.initialize(Path.java:206)
        at org.apache.hadoop.fs.Path.<init>(Path.java:172)
        at org.apache.hadoop.fs.Path.<init>(Path.java:94)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
        at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
        at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
        at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
        at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
        at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
        at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at sparklyr.Invoke$.invoke(invoke.scala:94)
        at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
        at sparklyr.StreamHandler$.read(stream.scala:55)
        at sparklyr.BackendHandler.channelRead0(handler.scala:49)
        at sparklyr.BackendHandler.channelRead0(handler.scala:14)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Unknown Source)
    Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 4: s3n:
        at java.net.URI$Parser.fail(Unknown Source)
        at java.net.URI$Parser.failExpecting(Unknown Source)
        at java.net.URI$Parser.parse(Unknown Source)
        at java.net.URI.<init>(Unknown Source)
        at org.apache.hadoop.fs.Path.initialize(Path.java:203)


... 58 more

Solution

  • Have the issue fixed. Here is code snippet . Note: Need to verify the correct JVM is running . I used a 32 bit jvm on a 64 bit machine, as the 64bit did not work . - spark version - 2.0 - hadoop version - 2.7

    # install.packages("devtools")
    # devtools::install_github("rstudio/sparklyr") 
    
    library(sparklyr)
    library(dplyr)
    
    # conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
    # config$spark.executor.memory <- "4g"
    sc <- spark_connect(master = "local",config = conf)
    
    #Get spark context  
    ctx <- sparklyr::spark_context(sc)
    
    #Use below to set the java spark context
    jsc <- invoke_static(  
      sc,
      "org.apache.spark.api.java.JavaSparkContext",
      "fromSparkContext",
      ctx
    )
    #set the s3 configs:  
    hconf <- jsc %>% invoke("hadoopConfiguration")  
    hconf %>% invoke("set","fs.s3a.access.key", "xxxx")  
    hconf %>% invoke("set","fs.s3a.secret.key", "xxxx")  
    
    # check if spar session is active
    sparklyr::spark_connection_is_open(sc=sc)
    
    
    small_file = "s3a://temp-sg/MVC"
    
    temp<-spark_read_csv(sc,name = "MVC",path=small_file,infer_schema = TRUE)
    spark_disconnect(sc)