Search code examples
apache-sparkpysparkapache-kafkakafka-pythonspark-streaming-kafka

Kafka create stream running but not printing the processed output from Kafka Topic in Pyspark


I am using Kafka version 2.0, Spark version is 2.2.0.2.6.4.0-91 , Python version is 2.7.5 I am running this below code and it streams without any error but the count is not printing in the output.

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os

if __name__ == "__main__":

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 60)
    print("spark cotext set")

    zkQuorum, topic = 'master.hdp:2181','streamit'
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "console-consumer-68081", {topic: 1})
    print("connection set")
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

My Spark submit Command is

/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client --packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kstream.py

The last part of my output log shows. It gets stream but doesn't show the desired processed output.

-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------

20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.0 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Starting job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Registering RDD 7 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:29:00 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 4 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 8.1 KB, free 366.2 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.2 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 4.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 71, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 172.16.0.21:51120
20/01/22 19:29:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 83 bytes
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 71) in 473 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:455) finished in 0.476 s
20/01/22 19:29:00 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:455, took 0.497775 s
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 6 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 6.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 72, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 72) in 123 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:455) finished in 0.125 s
20/01/22 19:29:00 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:455, took 0.136936 s
-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------

20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Total delay: 0.811 s for time 1579701540000 ms (execution: 0.684 s)
20/01/22 19:29:00 INFO ReceivedBlockTracker: Deleting batches:
20/01/22 19:29:00 INFO InputInfoTracker: remove old batch metadata:
20/01/22 19:30:00 INFO JobScheduler: Added jobs for time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
-------------------------------------------
Time: 2020-01-22 19:30:00
-------------------------------------------

20/01/22 19:30:00 INFO JobScheduler: Finished job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.1 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Registering RDD 16 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:30:00 INFO DAGScheduler: Got job 4 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 8 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 8.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 73, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to 172.16.0.21:51120
20/01/22 19:30:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 83 bytes
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 73) in 120 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 8.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 8 (runJob at PythonRDD.scala:455) finished in 0.121 s
20/01/22 19:30:00 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:455, took 0.134627 s
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Got job 5 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 10 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 9)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 10.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 74, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 74) in 132 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 10 (runJob at PythonRDD.scala:455) finished in 0.133 s
20/01/22 19:30:00 INFO DAGScheduler: Job 5 finished: runJob at PythonRDD.scala:455, took 0.143611 s

Solution

  • Give key.deserializer and value.deserializer in kafkaParams and use createDirectStream

    kafkaParams = {"metadata.broker.list": config['kstream']['broker'], "auto_offset_reset" : 'earliest',"auto.create.topics.enable": "true","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"}
    kvs = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)