I have a build job I'm trying to set up in an AWS Fargate cluster of 1 node. When I try to run Spark to build my data, I get an error that seems to be about Java not being able to find "localHost".
I set up the config by running a script that adds the spark-env.sh
file, updates the /etc/hosts
file and updates the spark-defaults.conf
file.
In the $SPARK_HOME/conf/spark-env.sh
file, I add:
SPARK_LOCAL_IP
SPARK_MASTER_HOST
In the $SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages <comma separated jars>
spark.master <ip or URL>
spark.driver.bindAddress <IP or URL>
spark.driver.host <IP or URL>
In the /etc/hosts
file, I append:
<IP I get from http://169.254.170.2/v2/metadata> master
Invoking the spark-submit
script by passing in the -master <IP or URL>
argument with an IP or URL doesn't seem to help.
I've tried using local[*]
, spark://<ip from metadata>:<port from metadata>
, <ip>
and <ip>:<port>
variations, to no avail.
Using 127.0.0.1
and localhost
don't seem to make a difference, compared to using things like master
and the IP returned from metadata.
On the AWS side, the Fargate cluster is running in a private subnet with a NatGateway attached, so it does have egress and ingress network routes, as far as I can tell. I've tried using a public network and ENABLED
ing the setting for ECS to automatically attach a public IP to the container.
All the standard ports from the Spark docs are opened up on the container too.
It seems to run fine up until the point at which it tries to gather its own IP.
The error that I get back has this, in the stack:
spark.jars.packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
spark.master spark://10.0.41.190:7077
Spark Command: /docker-java-home/bin/java -cp /usr/spark/conf/:/usr/spark/jars/* -Xmx1gg org.apache.spark.deploy.SparkSubmit --master spark://10.0.41.190:7077 --verbose --jars lib/RedshiftJDBC42-1.2.12.1017.jar --packages org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4,com.upplication:s3fs:2.2.1 ./build_phase.py
========================================
Using properties file: /usr/spark/conf/spark-defaults.conf
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.util.Utils$.redact(Utils.scala:2653)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties$1.apply(SparkSubmitArguments.scala:93)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties$1.apply(SparkSubmitArguments.scala:86)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties$lzycompute(SparkSubmitArguments.scala:86)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties(SparkSubmitArguments.scala:82)
at org.apache.spark.deploy.SparkSubmitArguments.mergeDefaultSparkProperties(SparkSubmitArguments.scala:126)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: d4771b650361: d4771b650361: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
at org.apache.spark.util.Utils$.findLocalInetAddress(Utils.scala:891)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress$lzycompute(Utils.scala:884)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress(Utils.scala:884)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:941)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:941)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:941)
at org.apache.spark.internal.config.package$.<init>(package.scala:204)
at org.apache.spark.internal.config.package$.<clinit>(package.scala)
... 10 more
The container has no problems when trying to run locally so I think it has something to do with the nature of Fargate.
Any help or pointers would be much appreciated!
Edit
Since the post I've tried a few different things. I am using images that run with Spark 2.3, Hadoop 2.7 and Python 3 and I've tried adding OS packages and different variations of the config I mentioned already.
It all smells like I'm doing the spark-defaults.conf
and friends wrong but I'm so new to this stuff that it could just be a bad alignment of Jupiter and Mars...
The current stack trace:
Getting Spark Context...
2018-06-08 22:39:40 INFO SparkContext:54 - Running Spark version 2.3.0
2018-06-08 22:39:40 INFO SparkContext:54 - Submitted application: SmashPlanner
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing view acls to: root
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing modify acls to: root
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing view acls groups to:
2018-06-08 22:39:41 INFO SecurityManager:54 - Changing modify acls groups to:
2018-06-08 22:39:41 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
2018-06-08 22:39:41 ERROR SparkContext:91 - Error initializing SparkContext.
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2018-06-08 22:39:41 INFO SparkContext:54 - Successfully stopped SparkContext
Traceback (most recent call last):
File "/usr/local/smash_planner/build_phase.py", line 13, in <module>
main()
File "/usr/local/smash_planner/build_phase.py", line 9, in main
build_all_data(pred_date)
File "/usr/local/smash_planner/DataPiping/build_data.py", line 25, in build_all_data
save_keyword(pred_date)
File "/usr/local/smash_planner/DataPiping/build_data.py", line 52, in save_keyword
df = get_dataframe(query)
File "/usr/local/smash_planner/SparkUtil/data_piping.py", line 15, in get_dataframe
sc = SparkCtx.get_sparkCtx()
File "/usr/local/smash_planner/SparkUtil/context.py", line 20, in get_sparkCtx
sc = SparkContext(conf=conf).getOrCreate()
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 118, in __init__
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 180, in _do_init
File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 270, in _initialize_context
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/java_gateway.py", line 1428, in __call__
answer, self._gateway_client, None, self._fqn)
File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2018-06-08 22:39:41 INFO ShutdownHookManager:54 - Shutdown hook called
2018-06-08 22:39:41 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-80488ba8-2367-4fa6-8bb7-194b5ebf08cc
Traceback (most recent call last):
File "bin/smash_planner.py", line 76, in <module>
raise RuntimeError("Spark hated your config and/or invocation...")
RuntimeError: Spark hated your config and/or invocation...
SparkConf runtime configuration:
def get_dataframe(query):
...
sc = SparkCtx.get_sparkCtx()
sql_context = SQLContext(sc)
df = sql_context.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", os.getenv('JDBC_URL')) \
.option("user", os.getenv('REDSHIFT_USER')) \
.option("password", os.getenv('REDSHIFT_PASSWORD')) \
.option("dbtable", "( " + query + " ) tmp ") \
.load()
return df
Edit 2
Using only the spark-env
configuration and running with the defaults from the gettyimages/docker-spark image gives this error, in the browser.
java.util.NoSuchElementException
at java.util.Collections$EmptyIterator.next(Collections.java:4189)
at org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:273)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:534)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
The solution is to avoid user error...
This was a total face-palm situation but I hope my misunderstanding of the Spark system can help some poor fool, like myself, who has spent too much time stuck on the same type of problem.
The answer for the last iteration (gettyimages/docker-spark
Docker image) was that I was trying to run the spark-submit
command without having a master or worker(s) started.
In the gettyimages/docker-spark
repo, you can find a docker-compose
file that shows you that it creates the master
and the worker
nodes before any spark work is done. The way that image creates a master or a worker is by using the spark-class
script and passing in the org.apache.spark.deploy.<master|worker>.<Master|Worker>
class, respectively.
So, putting it all together, I can use the configuration I was using but I have to create the master
and worker(s)
first, then execute the spark-submit
command the same as I was already doing.
This is a quick and dirty of one implementation, although I guarantee there's better, done by folks who actually know what they're doing:
The first 3 steps happen in a cluster boot script. I do this in an AWS Lambda, triggered by an APIGateway
Everything below this happens in the startup script on the Spark nodes
SPARK_MASTER_HOST
and SPARK_LOCAL_IP
to the $SPARK_HOME/conf/spark-env.sh
file, using the information from the message you picked up in step 4spark.driver.bindAddress
to the $SPARK_HOME/conf/spark-defaults.conf
file, using the information from the message you picked up in step 4gettyimages/docker-spark
image, you can start a master with $SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master -h <the master's IP or domain>
and you can start a worker with $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker -h spark://<master's domain or IP>:7077
spark-submit
command, which will deploy the work to the cluster.Edit: (some code for reference) This is the addition to the lambda
def handler(event, context):
config = BuildConfig(event)
res = create_job(config)
return build_response(res)
and after the edit
def handler(event, context):
config = BuildConfig(event)
coordination_queue = config.cluster + '-coordination'
sqs = boto3.client('sqs')
message_for_master_node = {'type': 'master', 'count': config.count}
queue_urls = sqs.list_queues(QueueNamePrefix=coordination_queue)['QueueUrls']
if not queue_urls:
queue_url = sqs.create_queue(QueueName=coordination_queue)['QueueUrl']
else:
queue_url = queue_urls[0]
sqs.send_message(QueueUrl=queue_url,
MessageBody=message_for_master_node)
res = create_job(config)
return build_response(res)
and then I added a little to the script that the nodes in the Spark cluster run, on startup:
# addition to the "main" in the Spark node's startup script
sqs = boto3.client('sqs')
boot_info_message = sqs.receive_message(
QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
MaxNumberOfMessages=1)['Messages'][0]
boot_info = boot_info_message['Body']
message_for_worker = {'type': 'worker', 'master': self_url}
if boot_info['type'] == 'master':
for i in range(int(boot_info['count'])):
sqs.send_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
MessageBody=message_for_worker)
sqs.delete_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
ReceiptHandle=boot_info_message['ReceiptHandle'])
...
# starts a master or worker node
startup_command = "org.apache.spark.deploy.{}.{}".format(
boot_info['type'], boot_info['type'].title())
subprocess.call(startup_command)