I've been using bdutil for a year now, with hadoop and spark and this is quite perfect! Now I've got a little problem trying to get SparkR to work with Google Storage as HDFS.
Here is my setup : - bdutil 1.2.1 - I have deployed a cluster with 1 master and 1 worker with Spark 1.3.0 installed - Installed R and SparkR on both master and worker
When I run SparkR on master node, I'm trying to point a directory on my GS bucket serveral ways:
1) By setting the gs Filesystem scheme
> file <- textFile(sc, "gs://xxxxx/dir/")
> count(file)
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library is available
15/05/27 12:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library not loaded
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No FileSystem for scheme: gs
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
... 25 more
Error: returnStatus == 0 is not TRUE
2) With a HDFS URL
> file <- textFile(sc, "hdfs://hadoop-stage-m:8020/dir/")
> count(file)
collect on 10 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop-stage-m:8020/dir
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
... 25 more
Error: returnStatus == 0 is not TRUE
3) With a path as I would use with Scala on my other Spark jobs : quite the same error as 2)
I'm sure I'm missing an obvious step. If there is anyone who can help me on this matter, it would be great!
Thanks,
PS: I'm 100% sure that gcs connector is working on a classic Scala job!
Short Answer
You need core-site.xml, hdfs-site.xml, etc., and the gcs-connector-1.3.3-hadoop1.jar on your classpath. Accomplish this with:
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
You may also want other spark-env.sh
settings; consider additionally running:
source /home/hadoop/spark-install/conf/spark-env.sh
Before ./sparkR
. If you're calling sparkR.init manually in R, then this isn't as necessary since you'll pass params like master
directly.
Other possible pitfalls:
sudo update-alternatives --config java
and select Java 7 as default.SPARK_VERSION=1.3.0 ./install-dev.sh
Long Answer
Generally, the "No FileSystem for scheme" error means we need to make sure core-site.xml is on the classpath; a second error I ran into after fixing the classpath was "java.lang.ClassNotFoundException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" which means we also need to add gcs-connector-1.3.3.jar to the classpath. Looking through the SparkR helper scripts, the main sparkR
binary calls sparkR.init
with the following:
sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
The MASTER
environment variable is commonly found in the spark-env.sh
script, and indeed bdutil
populates the MASTER
environment variable under /home/hadoop/spark-install/conf/spark-env.sh
. Typically, this should indicate that simply adding source /home/hadoop/spark-install/conf/spark-env.sh
should sufficiently populate the necessary settings for SparkR, but if we peek inside the sparkR
definition, we see this:
#' Initialize a new Spark Context.
#'
#' This function initializes a new SparkContext.
#'
#' @param master The Spark master URL.
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkRBackendPort The port to use for SparkR JVM Backend.
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("jarfile1.jar","jarfile2.jar"))
#'}
sparkR.init <- function(
master = "",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "") {
<...>
cp <- paste0(jars, collapse = collapseChar)
yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
if (yarn_conf_dir != "") {
cp <- paste(cp, yarn_conf_dir, sep = ":")
}
<...>
if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
launchBackend(classPath = cp,
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = path,
javaOpts = paste("-Xmx", sparkMem, sep = ""))
} else {
# TODO: We should deprecate sparkJars and ask users to add it to the
# command line (using --jars) which is picked up by SparkSubmit
launchBackendSparkSubmit(
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = path,
appJar = .sparkREnv$assemblyJarPath,
sparkHome = sparkHome,
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
}
This tells us three things:
sparkR
script fails to pass sparkJars
, so there doesn't appear to be a current convenient way to pass libjars as flags.sparkJars
param anyways.sparkJars
param, the only other thing going into the cp
/classPath
argument is YARN_CONF_DIR
(unless I'm missing some other source of classpath additions, or if I'm using a different version of sparkR than you). Also, fortunately, it appears to use YARN_CONF_DIR
even if you're not planning to run on YARN.In all, this shows you probably want at least the variables in /home/hadoop/spark-install/conf/spark-env.sh
since at least some of the hooks appear to look for environment variables commonly defined there, and secondly we should be able to hack YARN_CONF_DIR
to specify both the classpath to make it find core-site.xml as well as to add gcs-connector-1.3.3.jar to the classpath.
So, the answer to your question is:
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
You may need to change the /home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
part if you're using hadoop2 or some other gcs-connector version. That command fixes both the HDFS access as well as finding the fs.gs.impl
for the gcs-connector as well as making sure the actual gcs-connector jar is on the classpath. It doesn't pull in spark-env.sh
so you might find it defaulting to running with MASTER=local
. You may consider running the following, assuming your worker nodes have also properly installed SparkR:
source /home/hadoop/spark-install/conf/spark-env.sh
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
A couple additional caveats based on what I encountered:
sudo update-alternatives --config java
and make Java 7 the default.install-dev.sh
, Spark may erroneously hang with "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory" when in fact dispatchers are fast-failing with serialVersionUID mismatches, which you can see in /hadoop/spark/logs/*Master*.out - the solution is to make sure you run install-dev.sh with the right Spark version set: SPARK_VERSION=1.3.0 ./install-dev.sh