Search code examples
apache-sparkpysparkcontainersspark-submit

Spark on Fargate can't find local IP


I have a build job I'm trying to set up in an AWS Fargate cluster of 1 node. When I try to run Spark to build my data, I get an error that seems to be about Java not being able to find "localHost".

I set up the config by running a script that adds the spark-env.sh file, updates the /etc/hosts file and updates the spark-defaults.conf file.

In the $SPARK_HOME/conf/spark-env.sh file, I add:

  • SPARK_LOCAL_IP
  • SPARK_MASTER_HOST

In the $SPARK_HOME/conf/spark-defaults.conf

  • spark.jars.packages <comma separated jars>
  • spark.master <ip or URL>
  • spark.driver.bindAddress <IP or URL>
  • spark.driver.host <IP or URL>

In the /etc/hosts file, I append:

  • <IP I get from http://169.254.170.2/v2/metadata> master

Invoking the spark-submit script by passing in the -master <IP or URL> argument with an IP or URL doesn't seem to help.

I've tried using local[*], spark://<ip from metadata>:<port from metadata>, <ip> and <ip>:<port> variations, to no avail. Using 127.0.0.1 and localhost don't seem to make a difference, compared to using things like master and the IP returned from metadata.

On the AWS side, the Fargate cluster is running in a private subnet with a NatGateway attached, so it does have egress and ingress network routes, as far as I can tell. I've tried using a public network and ENABLEDing the setting for ECS to automatically attach a public IP to the container. All the standard ports from the Spark docs are opened up on the container too.

It seems to run fine up until the point at which it tries to gather its own IP.

The error that I get back has this, in the stack:

spark.jars.packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
spark.master spark://10.0.41.190:7077
Spark Command: /docker-java-home/bin/java -cp /usr/spark/conf/:/usr/spark/jars/* -Xmx1gg org.apache.spark.deploy.SparkSubmit --master spark://10.0.41.190:7077 --verbose --jars lib/RedshiftJDBC42-1.2.12.1017.jar --packages org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4,com.upplication:s3fs:2.2.1 ./build_phase.py
========================================
Using properties file: /usr/spark/conf/spark-defaults.conf
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.util.Utils$.redact(Utils.scala:2653)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties$1.apply(SparkSubmitArguments.scala:93)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties$1.apply(SparkSubmitArguments.scala:86)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties$lzycompute(SparkSubmitArguments.scala:86)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties(SparkSubmitArguments.scala:82)
at org.apache.spark.deploy.SparkSubmitArguments.mergeDefaultSparkProperties(SparkSubmitArguments.scala:126)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: d4771b650361: d4771b650361: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
at org.apache.spark.util.Utils$.findLocalInetAddress(Utils.scala:891)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress$lzycompute(Utils.scala:884)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress(Utils.scala:884)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:941)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:941)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:941)
at org.apache.spark.internal.config.package$.<init>(package.scala:204)
at org.apache.spark.internal.config.package$.<clinit>(package.scala)
... 10 more

The container has no problems when trying to run locally so I think it has something to do with the nature of Fargate.

Any help or pointers would be much appreciated!

Edit

Since the post I've tried a few different things. I am using images that run with Spark 2.3, Hadoop 2.7 and Python 3 and I've tried adding OS packages and different variations of the config I mentioned already.

It all smells like I'm doing the spark-defaults.conf and friends wrong but I'm so new to this stuff that it could just be a bad alignment of Jupiter and Mars...

The current stack trace:

    Getting Spark Context...
    2018-06-08 22:39:40 INFO  SparkContext:54 - Running Spark version 2.3.0
    2018-06-08 22:39:40 INFO  SparkContext:54 - Submitted application: SmashPlanner
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing view acls to: root
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing modify acls to: root
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing view acls groups to:
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing modify acls groups to:
    2018-06-08 22:39:41 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    2018-06-08 22:39:41 ERROR SparkContext:91 - Error initializing SparkContext.
    java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
        at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
        at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
        at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
        at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
        at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
    2018-06-08 22:39:41 INFO  SparkContext:54 - Successfully stopped SparkContext
    Traceback (most recent call last):
      File "/usr/local/smash_planner/build_phase.py", line 13, in <module>
        main()
      File "/usr/local/smash_planner/build_phase.py", line 9, in main
        build_all_data(pred_date)
      File "/usr/local/smash_planner/DataPiping/build_data.py", line 25, in build_all_data
        save_keyword(pred_date)
      File "/usr/local/smash_planner/DataPiping/build_data.py", line 52, in save_keyword
        df = get_dataframe(query)
      File "/usr/local/smash_planner/SparkUtil/data_piping.py", line 15, in get_dataframe
        sc = SparkCtx.get_sparkCtx()
      File "/usr/local/smash_planner/SparkUtil/context.py", line 20, in get_sparkCtx
        sc = SparkContext(conf=conf).getOrCreate()
      File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 118, in __init__
      File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 180, in _do_init
      File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 270, in _initialize_context
      File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/java_gateway.py", line 1428, in __call__
        answer, self._gateway_client, None, self._fqn)
      File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/protocol.py", line 320, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
        at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
        at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
        at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
        at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
        at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)

    2018-06-08 22:39:41 INFO  ShutdownHookManager:54 - Shutdown hook called
    2018-06-08 22:39:41 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-80488ba8-2367-4fa6-8bb7-194b5ebf08cc
    Traceback (most recent call last):
      File "bin/smash_planner.py", line 76, in <module>
        raise RuntimeError("Spark hated your config and/or invocation...")
    RuntimeError: Spark hated your config and/or invocation...

SparkConf runtime configuration:

def get_dataframe(query):
    ...
    sc = SparkCtx.get_sparkCtx()
    sql_context = SQLContext(sc)

    df = sql_context.read \
        .format("jdbc") \
        .option("driver", "com.amazon.redshift.jdbc42.Driver") \
        .option("url", os.getenv('JDBC_URL')) \
        .option("user", os.getenv('REDSHIFT_USER')) \
        .option("password", os.getenv('REDSHIFT_PASSWORD')) \
        .option("dbtable", "( " + query + " ) tmp ") \
        .load()

    return df

Edit 2

Using only the spark-env configuration and running with the defaults from the gettyimages/docker-spark image gives this error, in the browser.

java.util.NoSuchElementException
at java.util.Collections$EmptyIterator.next(Collections.java:4189)
at org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:273)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:534)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)

Solution

  • The solution is to avoid user error...

    This was a total face-palm situation but I hope my misunderstanding of the Spark system can help some poor fool, like myself, who has spent too much time stuck on the same type of problem.

    The answer for the last iteration (gettyimages/docker-spark Docker image) was that I was trying to run the spark-submit command without having a master or worker(s) started. In the gettyimages/docker-spark repo, you can find a docker-compose file that shows you that it creates the master and the worker nodes before any spark work is done. The way that image creates a master or a worker is by using the spark-class script and passing in the org.apache.spark.deploy.<master|worker>.<Master|Worker> class, respectively.

    So, putting it all together, I can use the configuration I was using but I have to create the master and worker(s) first, then execute the spark-submit command the same as I was already doing.

    This is a quick and dirty of one implementation, although I guarantee there's better, done by folks who actually know what they're doing:

    The first 3 steps happen in a cluster boot script. I do this in an AWS Lambda, triggered by an APIGateway

    1. create a cluster and a queue or some sort of message brokerage system, like zookeeper/kafka. (I'm using API-Gateway -> lambda for this)
    2. pick a master node (logic in the lambda)
    3. create a message with some basic information, like the master's IP or domain and put it in the queue from step 1 (happens in the lambda)

    Everything below this happens in the startup script on the Spark nodes

    1. create a step in the startup script that has the nodes check the queue for the message from step 3
    2. add SPARK_MASTER_HOST and SPARK_LOCAL_IP to the $SPARK_HOME/conf/spark-env.sh file, using the information from the message you picked up in step 4
    3. add spark.driver.bindAddress to the $SPARK_HOME/conf/spark-defaults.conf file, using the information from the message you picked up in step 4
    4. use some logic in your startup script to decide "this" node is a master or a worker
    5. start the master or worker. in the gettyimages/docker-spark image, you can start a master with $SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master -h <the master's IP or domain> and you can start a worker with $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker -h spark://<master's domain or IP>:7077
    6. Now you can run the spark-submit command, which will deploy the work to the cluster.

    Edit: (some code for reference) This is the addition to the lambda

    def handler(event, context):
        config = BuildConfig(event)
        res = create_job(config)
        return build_response(res)
    

    and after the edit

    def handler(event, context):
        config = BuildConfig(event)
        coordination_queue = config.cluster + '-coordination'
    
        sqs = boto3.client('sqs')
        message_for_master_node = {'type': 'master', 'count': config.count}
        queue_urls = sqs.list_queues(QueueNamePrefix=coordination_queue)['QueueUrls']
    
        if not queue_urls:
            queue_url = sqs.create_queue(QueueName=coordination_queue)['QueueUrl']
        else:
            queue_url = queue_urls[0]
    
         sqs.send_message(QueueUrl=queue_url,
                     MessageBody=message_for_master_node)
    
        res = create_job(config)
        return build_response(res)
    

    and then I added a little to the script that the nodes in the Spark cluster run, on startup:

    # addition to the "main" in the Spark node's startup script
    sqs = boto3.client('sqs')
    boot_info_message = sqs.receive_message(
        QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
        MaxNumberOfMessages=1)['Messages'][0]
    boot_info = boot_info_message['Body']
    message_for_worker = {'type': 'worker', 'master': self_url}
    
    if boot_info['type'] == 'master':
        for i in range(int(boot_info['count'])):
            sqs.send_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
                             MessageBody=message_for_worker)
    sqs.delete_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
                       ReceiptHandle=boot_info_message['ReceiptHandle'])
    
    ...
    # starts a master or worker node
    startup_command = "org.apache.spark.deploy.{}.{}".format(
        boot_info['type'], boot_info['type'].title())
    subprocess.call(startup_command)