Search code examples
apache-sparkpysparks3distcp

Adding S3DistCp to PySpark


I'm trying to add S3DistCp to my local, standalone Spark install. I've downloaded S3DistCp:

aws s3 cp s3://elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar . 

And the AWS SDK as well:

wget http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip

I extracted the AWS SDK:

unzip aws-java-sdk.zip

Then added s3distcp.jar to my spark-defaults.conf:

spark.driver.extraClassPath /Users/mark.miller/.ivy2/jars/s3distcp.jar
spark.executor.extraClassPath /Users/mark.miller/.ivy2/jars/s3distcp.jar

Then I added the AWS SDK and all it's dependencies to $LIBJARS and $HADOOP_CLASSPATH

export $LIBJARS=/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/lib/aws-java-sdk-1.11.86.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/aspectjrt-1.8.2.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/aspectjweaver.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/commons-codec-1.9.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/commons-logging-1.1.3.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/freemarker-2.3.9.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/httpclient-4.5.2.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/httpcore-4.4.4.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/ion-java-1.0.1.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/jackson-annotations-2.6.0.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/jackson-core-2.6.6.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/jackson-databind-2.6.6.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/jackson-dataformat-cbor-2.6.6.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/javax.mail-api-1.4.6.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/jmespath-java-1.11.86.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/joda-time-2.8.1.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/json-path-2.2.0.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/slf4j-api-1.7.16.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/spring-beans-3.0.7.RELEASE.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/spring-context-3.0.7.RELEASE.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/spring-core-3.0.7.RELEASE.jar,/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/spring-test-3.0.7.RELEASE.jar
export HADOOP_CLASSPATH=$LIBJARS

But when I try to start the pyspark shell:

$ pyspark

I get the following error:

Python 2.7.13 (default, Dec 18 2016, 07:03:39)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/06 17:48:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/06 17:48:50 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor).  This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:236)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/shell.py", line 47, in <module>
    spark = SparkSession.builder.getOrCreate()
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/sql/session.py", line 169, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.py", line 307, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.py", line 118, in __init__
    conf, jsc, profiler_cls)
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.py", line 179, in _do_init
    self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.py", line 246, in _initialize_context
    return self._jvm.JavaSparkContext(jconf)
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1401, in __call__
  File "/usr/local/Cellar/apache-spark/2.1.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NoClassDefFoundError: Could not initialize class com.google.common.cache.LocalCache
        at com.google.common.cache.LocalCache$LocalLoadingCache.<init>(LocalCache.java:4867)
        at com.google.common.cache.CacheBuilder.build(CacheBuilder.java:785)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:74)
        at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:303)
        at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:284)
        at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
        at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791)
        at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761)
        at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634)
        at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2373)
        at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2373)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2373)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:295)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)

If I remove s3distcp.jar from spark-defaults.conf the error goes away. There just doesn't seem to be much documentation on how to deploy this since it's provided as part of EMR.


Solution

  • I was able to get this working by passing --driver-class-path to pyspark:

    $ pyspark \
     --driver-class-path \
     ~/Downloads/aws-java-sdk-1.11.86/lib/aws-java-sdk-1.11.86.jar:\
     ~/Downloads/aws-java-sdk-1.11.86/third-party/lib/*
    

    To set this up in spark-defaults.conf I had to do it this way:

    spark.jars /Users/mark.miller/.ivy2/jars/s3distcp.jar
    spark.driver.extraClassPath /Users/mark.miller/Downloads/aws-java-sdk-1.11.86/lib/aws-java-sdk-1.11.86.jar:/Users/mark.miller/Downloads/aws-java-sdk-1.11.86/third-party/lib/*
    

    I also learned that spark.executor.extraClassPath is only needed for backwards-compatibility with older versions of spark (https://spark.apache.org/docs/latest/configuration.html#runtime-environment)