Search code examples
rapache-sparkgoogle-hadoop

Accessing Google Storage with SparkR on bdutil deployed cluster


I've been using bdutil for a year now, with hadoop and spark and this is quite perfect! Now I've got a little problem trying to get SparkR to work with Google Storage as HDFS.

Here is my setup : - bdutil 1.2.1 - I have deployed a cluster with 1 master and 1 worker with Spark 1.3.0 installed - Installed R and SparkR on both master and worker

When I run SparkR on master node, I'm trying to point a directory on my GS bucket serveral ways:

1) By setting the gs Filesystem scheme

> file <- textFile(sc, "gs://xxxxx/dir/")
> count(file)
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library is available
15/05/27 12:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library not loaded
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No FileSystem for scheme: gs
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        ... 25 more
Error: returnStatus == 0 is not TRUE

2) With a HDFS URL

> file <- textFile(sc, "hdfs://hadoop-stage-m:8020/dir/")
> count(file)
collect on 10 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop-stage-m:8020/dir
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        ... 25 more
Error: returnStatus == 0 is not TRUE

3) With a path as I would use with Scala on my other Spark jobs : quite the same error as 2)

I'm sure I'm missing an obvious step. If there is anyone who can help me on this matter, it would be great!

Thanks,

PS: I'm 100% sure that gcs connector is working on a classic Scala job!


Solution

  • Short Answer

    You need core-site.xml, hdfs-site.xml, etc., and the gcs-connector-1.3.3-hadoop1.jar on your classpath. Accomplish this with:

    export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
    ./sparkR
    

    You may also want other spark-env.sh settings; consider additionally running:

    source /home/hadoop/spark-install/conf/spark-env.sh
    

    Before ./sparkR. If you're calling sparkR.init manually in R, then this isn't as necessary since you'll pass params like master directly.

    Other possible pitfalls:

    1. Make sure your default Java is Java 7. If it's Java 6, run sudo update-alternatives --config java and select Java 7 as default.
    2. When building sparkR make sure to set Spark version: SPARK_VERSION=1.3.0 ./install-dev.sh

    Long Answer

    Generally, the "No FileSystem for scheme" error means we need to make sure core-site.xml is on the classpath; a second error I ran into after fixing the classpath was "java.lang.ClassNotFoundException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" which means we also need to add gcs-connector-1.3.3.jar to the classpath. Looking through the SparkR helper scripts, the main sparkR binary calls sparkR.init with the following:

      sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
    

    The MASTER environment variable is commonly found in the spark-env.sh script, and indeed bdutil populates the MASTER environment variable under /home/hadoop/spark-install/conf/spark-env.sh. Typically, this should indicate that simply adding source /home/hadoop/spark-install/conf/spark-env.sh should sufficiently populate the necessary settings for SparkR, but if we peek inside the sparkR definition, we see this:

    #' Initialize a new Spark Context.
    #'
    #' This function initializes a new SparkContext.
    #'
    #' @param master The Spark master URL.
    #' @param appName Application name to register with cluster manager
    #' @param sparkHome Spark Home directory
    #' @param sparkEnvir Named list of environment variables to set on worker nodes.
    #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
    #' @param sparkJars Character string vector of jar files to pass to the worker nodes.
    #' @param sparkRLibDir The path where R is installed on the worker nodes.
    #' @param sparkRBackendPort The port to use for SparkR JVM Backend.
    #' @export
    #' @examples
    #'\dontrun{
    #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
    #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
    #'                  list(spark.executor.memory="1g"))
    #' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
    #'                  list(spark.executor.memory="1g"),
    #'                  list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
    #'                  c("jarfile1.jar","jarfile2.jar"))
    #'}
    
    sparkR.init <- function(
      master = "",
      appName = "SparkR",
      sparkHome = Sys.getenv("SPARK_HOME"),
      sparkEnvir = list(),
      sparkExecutorEnv = list(),
      sparkJars = "",
      sparkRLibDir = "") {
    
      <...>
      cp <- paste0(jars, collapse = collapseChar)
    
      yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
      if (yarn_conf_dir != "") {
        cp <- paste(cp, yarn_conf_dir, sep = ":")
      }
      <...>
    
        if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
          launchBackend(classPath = cp,
                        mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
                        args = path,
                        javaOpts = paste("-Xmx", sparkMem, sep = ""))
        } else {
          # TODO: We should deprecate sparkJars and ask users to add it to the
          # command line (using --jars) which is picked up by SparkSubmit
          launchBackendSparkSubmit(
              mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
              args = path,
              appJar = .sparkREnv$assemblyJarPath,
              sparkHome = sparkHome,
              sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
        }
    

    This tells us three things:

    1. The default sparkR script fails to pass sparkJars, so there doesn't appear to be a current convenient way to pass libjars as flags.
    2. There's a TODO to deprecate the sparkJars param anyways.
    3. Aside from the sparkJars param, the only other thing going into the cp/classPath argument is YARN_CONF_DIR (unless I'm missing some other source of classpath additions, or if I'm using a different version of sparkR than you). Also, fortunately, it appears to use YARN_CONF_DIR even if you're not planning to run on YARN.

    In all, this shows you probably want at least the variables in /home/hadoop/spark-install/conf/spark-env.sh since at least some of the hooks appear to look for environment variables commonly defined there, and secondly we should be able to hack YARN_CONF_DIR to specify both the classpath to make it find core-site.xml as well as to add gcs-connector-1.3.3.jar to the classpath.

    So, the answer to your question is:

    export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
    ./sparkR
    

    You may need to change the /home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar part if you're using hadoop2 or some other gcs-connector version. That command fixes both the HDFS access as well as finding the fs.gs.impl for the gcs-connector as well as making sure the actual gcs-connector jar is on the classpath. It doesn't pull in spark-env.sh so you might find it defaulting to running with MASTER=local. You may consider running the following, assuming your worker nodes have also properly installed SparkR:

    source /home/hadoop/spark-install/conf/spark-env.sh
    export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
    ./sparkR
    

    A couple additional caveats based on what I encountered:

    1. You may find your R installation set an older Java version. If you run into something like "unsupported major.minor version 51.0", run sudo update-alternatives --config java and make Java 7 the default.
    2. If you're using Spark 1.3.0, if you're using SparkR's install-dev.sh, Spark may erroneously hang with "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory" when in fact dispatchers are fast-failing with serialVersionUID mismatches, which you can see in /hadoop/spark/logs/*Master*.out - the solution is to make sure you run install-dev.sh with the right Spark version set: SPARK_VERSION=1.3.0 ./install-dev.sh