spark-submit --jars spark-streaming-kafka-0-8_2.11-2.4.4.jar direct_approach.py localhost:9092 new_topic
I ran the code above but I don't know why I got this error. I spent hours to fix but I cannot. I am using Spark 2.4.4 and Scala 2.13.0. I tried to set spark.executor.memory and spark.driver.memory in my Spark configuration file but i still could not solve the problem.
Here is the error:
(tutorial-env) (base) harry@harry-badass:~/Desktop/twitter_project$ spark-submit --jars spark-streaming-kafka-0-8_2.11-2.4.4.jar direct_approach.py localhost:9092 new_topic
19/12/14 14:27:23 WARN Utils: Your hostname, harry-badass resolves to a loopback address: 127.0.1.1; using 220.149.84.46 instead (on interface enp4s0)
19/12/14 14:27:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/jars/spark-unsafe_2.11-2.4.4.jar) to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/12/14 14:27:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/12/14 14:27:24 INFO SparkContext: Running Spark version 2.4.4
19/12/14 14:27:24 INFO SparkContext: Submitted application: PythonStreamingDirectKafkaWordCount
19/12/14 14:27:24 INFO SecurityManager: Changing view acls to: harry
19/12/14 14:27:24 INFO SecurityManager: Changing modify acls to: harry
19/12/14 14:27:24 INFO SecurityManager: Changing view acls groups to:
19/12/14 14:27:24 INFO SecurityManager: Changing modify acls groups to:
19/12/14 14:27:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(harry); groups with view permissions: Set(); users with modify permissions: Set(harry); groups with modify permissions: Set()
19/12/14 14:27:24 INFO Utils: Successfully started service 'sparkDriver' on port 41699.
19/12/14 14:27:24 INFO SparkEnv: Registering MapOutputTracker
19/12/14 14:27:24 INFO SparkEnv: Registering BlockManagerMaster
19/12/14 14:27:24 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/12/14 14:27:24 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/12/14 14:27:24 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-2067d2bb-4b7c-49d8-8f02-f20e8467b21e
19/12/14 14:27:24 INFO MemoryStore: MemoryStore started with capacity 434.4 MB
19/12/14 14:27:24 INFO SparkEnv: Registering OutputCommitCoordinator
19/12/14 14:27:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/12/14 14:27:24 INFO Utils: Successfully started service 'SparkUI' on port 4041.
19/12/14 14:27:24 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://220.149.84.46:4041
19/12/14 14:27:24 INFO SparkContext: Added JAR file:///home/harry/Desktop/twitter_project/spark-streaming-kafka-0-8_2.11-2.4.4.jar at spark://220.149.84.46:41699/jars/spark-streaming-kafka-0-8_2.11-2.4.4.jar with timestamp 1576301244901
19/12/14 14:27:24 INFO Executor: Starting executor ID driver on host localhost
19/12/14 14:27:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 46637.
19/12/14 14:27:25 INFO NettyBlockTransferService: Server created on 220.149.84.46:46637
19/12/14 14:27:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/12/14 14:27:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 220.149.84.46, 46637, None)
19/12/14 14:27:25 INFO BlockManagerMasterEndpoint: Registering block manager 220.149.84.46:46637 with 434.4 MB RAM, BlockManagerId(driver, 220.149.84.46, 46637, None)
19/12/14 14:27:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 220.149.84.46, 46637, None)
19/12/14 14:27:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 220.149.84.46, 46637, None)
Exception in thread "Thread-5" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3139)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3164)
at java.base/java.lang.Class.getMethods(Class.java:1861)
at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:466)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:563)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:496)
... 12 more
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
File "/home/harry/Desktop/twitter_project/direct_approach.py", line 9, in <module>
kvs = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers})
File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 146, in createDirectStream
File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/harry/tutorial-env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler
19/12/14 14:27:25 INFO SparkContext: Invoking stop() from shutdown hook
19/12/14 14:27:25 INFO SparkUI: Stopped Spark web UI at http://220.149.84.46:4041
19/12/14 14:27:25 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/12/14 14:27:25 INFO MemoryStore: MemoryStore cleared
19/12/14 14:27:25 INFO BlockManager: BlockManager stopped
19/12/14 14:27:25 INFO BlockManagerMaster: BlockManagerMaster stopped
19/12/14 14:27:25 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/12/14 14:27:25 INFO SparkContext: Successfully stopped SparkContext
19/12/14 14:27:25 INFO ShutdownHookManager: Shutdown hook called
19/12/14 14:27:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-8e271f94-bec9-4f7e-aad0-1f3b651e9b29
19/12/14 14:27:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-747cc9ca-bca4-42a7-ad82-d6a055727394
19/12/14 14:27:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-747cc9ca-bca4-42a7-ad82-d6a055727394/pyspark-83cc90cc-1aaa-4dea-b364-4b66487be18f
Memory doesn't help find a missing class. You need to download the kafka-clients
JAR as well
Note: You can use --packages
instead of downloading jars