Search code examples
pythonapache-sparkapache-kafkaspark-structured-streaming

Structured Streaming - not writing records to console when using writeStream (batch seems to be working)


I've a simple Structured Streaming program, which reads data from Kafka, and writes on the console. This is working in batch mode (i.e spark.read or df.write), not not working in streaming mode.

Commands used to file query :

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 /Users/karanalang/PycharmProjects/Kafka/StructuredStreaming_GCP_Versa_Sase.py

Here is the code:

import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window

spark = SparkSession.builder.appName('StructuredStreaming_Kafka_Streaming').getOrCreate()
# os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'

kafkaBrokers='<IP>:9094'
topic = "versa-sase"
security_protocol="SSL"
ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/Install/versa_ssl_certs/dataproc-versa-sase/ca.p12"
ssl_truststore_password="<pwd>"
ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/Install/versa_ssl_certs/dataproc-versa-sase/dataproc-versa-sase.p12"
ssl_keystore_password="<pwd>"
consumerGroupId = "versa-sase-grp"

print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)

spark.sparkContext.setLogLevel("ERROR")

df_stream = spark.readStream.format('kafka') \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", ssl_truststore_location) \
    .option("kafka.ssl.truststore.password", ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location) \
    .option("kafka.ssl.keystore.password", ssl_keystore_password) \
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10) \
    .load()

print("df_stream -> ", df_stream, type(df_stream))
# df_stream ->  DataFrame[key: binary, value: binary, topic: string, partition: # int, offset: bigint, timestamp: timestamp, timestampType: int] <class #'pyspark.sql.dataframe.DataFrame'>

# query = df_stream.select("value", "topic","partition","timestamp") \
query = df_stream.select("value", "topic","partition","timestamp")
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='30 seconds') \
    .option("numRows",10)\
    .option("checkpointLocation", "checkpoint") \
    .start()

query.awaitTermination()

I am expecting the data to be written out on the console, but nothing really happens. When i enable debug (level - DEBUG), here is what i see :

34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561146, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 47160 for partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561150, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 47660 for partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561152, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 48160 for partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561154, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 48660 for partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561161, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 49160 for partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=1560872, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to offset 49385 for partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=1560877, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to offset 49885 for partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-0
^C22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-0

ADDITIONAL LOGS:

22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20778 requested 20778
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20779 requested 20779
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20780 requested 20780
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20781 requested 20781
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20782 requested 20782
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20783 requested 20783
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20784 requested 20784
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20785 requested 20785
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20786 requested 20786
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20787 requested 20787
22/02/01 15:39:21 DEBUG InternalKafkaConsumer: Seeking to versa-sase-grp versa-sase-2 20787
22/02/01 15:39:21 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 20787 for partition versa-sase-2
22/02/01 15:39:21 DEBUG InternalKafkaConsumer: Polled versa-sase-grp [versa-sase-2]  500
22/02/01 15:39:21 DEBUG InternalKafkaConsumer: Offset changed from 20787 to 21287 after polling
22/02/01 15:39:21 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-2
22/02/01 15:39:21 DEBUG Fetcher: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Sending ListOffsetRequest ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='versa-sase', partitions=[ListOffsetsPartition(partitionIndex=2, currentLeaderEpoch=15, timestamp=-2, maxNumOffsets=1)])]) to broker 34.138.248.133:9094 (id: 0 rack: null)
22/02/01 15:39:21 DEBUG NetworkClient: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=consumer-versa-sase-grp-2, correlationId=109) and timeout 30000 to node 0: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='versa-sase', partitions=[ListOffsetsPartition(partitionIndex=2, currentLeaderEpoch=15, timestamp=-2, maxNumOffsets=1)])])
22/02/01 15:39:21 INFO SparkContext: Successfully stopped SparkContext
22/02/01 15:39:21 INFO ShutdownHookManager: Shutdown hook called
22/02/01 15:39:21 INFO ShutdownHookManager: Deleting directory /private/var/folders/yp/p_9783hn1fg_dyzbf4sgnxqh0000gn/T/spark-6b854d12-55ad-48d5-bca1-e1c4a6992797
22/02/01 15:39:21 INFO ShutdownHookManager: Deleting directory /private/var/folders/yp/p_9783hn1fg_dyzbf4sgnxqh0000gn/T/spark-6b854d12-55ad-48d5-bca1-e1c4a6992797/pyspark-df44cb1a-c18c-4357-992c-47eff25f7eb0
22/02/01 15:39:21 INFO ShutdownHookManager: Deleting directory /private/var/folders/yp/p_9783hn1fg_dyzbf4sgnxqh0000gn/T/spark-edac7493-5da7-4a41-88a5-1203aa1b7ac1
22/02/01 15:39:21 DEBUG FileSystem: FileSystem.close() by method: org.apache.hadoop.fs.FilterFileSystem.close(FilterFileSystem.java:529)); Key: (karanalang (auth:SIMPLE))@file://; URI: file:///; Object Identity Hash: 3411b055
22/02/01 15:39:21 DEBUG FileSystem: FileSystem.close() by method: org.apache.hadoop.fs.RawLocalFileSystem.close(RawLocalFileSystem.java:759)); Key: null; URI: file:///; Object Identity Hash: f0683e4
22/02/01 15:39:21 DEBUG ShutdownHookManager: Completed shutdown in 0.238 seconds; Timeouts: 0
22/02/01 15:39:21 DEBUG NetworkClient: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=consumer-versa-sase-grp-3, correlationId=106): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='versa-sase', partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=1570867, leaderEpoch=17)])])
22/02/01 15:39:21 DEBUG Fetcher: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Handling ListOffsetResponse response for versa-sase-0. Fetched offset 1570867, timestamp -1
22/02/01 15:39:21 DEBUG Metadata: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Not replacing existing epoch 17 with new epoch 17 for partition versa-sase-0
22/02/01 15:39:21 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=1570867, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19800 requested 19800
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19801 requested 19801
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19802 requested 19802
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19803 requested 19803
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19804 requested 19804
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19805 requested 19805
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19806 requested 19806
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19807 requested 19807
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19808 requested 19808
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19809 requested 19809
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19810 requested 19810
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19811 requested 19811
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19812 requested 19812
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19813 requested 19813
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19814 requested 19814
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19815 requested 19815
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19816 requested 19816
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19817 requested 19817
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19818 requested 19818
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19819 requested 19819
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19820 requested 19820
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19821 requested 19821
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19822 requested 19822
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19823 requested 19823




What needs to be done to debug/fix this ? As i mentioned - the batch code is able to read the data from Kafka, and print the records on the console!

tia!

UPDATE : In the debug mode (for Streaming),I see the following error, not sure of this is an issue .. since after this, the logs indicate that spark is able to fetch the offsets from Kafka. Also, surprisingly - i don't see this error in the batch mode

22/02/01 16:07:36 DEBUG UserGroupInformation: PrivilegedAction [as: karanalang (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext$2@6a01a2c]
java.lang.Exception
   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
   at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
   at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
   at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:312)
   at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:202)
   at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:61)
   at org.apache.spark.sql.kafka010.KafkaSourceInitialOffsetWriter.<init>(KafkaSourceInitialOffsetWriter.scala:32)
   at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:227)
   at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:90)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$12(MicroBatchExecution.scala:512)
   at scala.Option.getOrElse(Option.scala:189)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:512)
   at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
   at scala.collection.Iterator.foreach(Iterator.scala:943)
   at scala.collection.Iterator.foreach$(Iterator.scala:943)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
   at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
   at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
   at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:492)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:492)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
   at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
   at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
   at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
22/02/01 16:07:36 DEBUG MicroBatchExecution: Retrieving data from KafkaV2[Subscribe[versa-sase]]: None -> {"versa-sase":{"2":1373669,"1":1373507,"0":1372412}}
22/02/01 16:07:36 DEBUG MicroBatchExecution: getBatch took 17 ms
22/02/01 16:07:36 INFO KafkaOffsetReaderConsumer: Partitions added: Map()
22/02/01 16:07:36 DEBUG KafkaOffsetReaderConsumer: TopicPartitions: versa-sase-2, versa-sase-1, versa-sase-0
22/02/01 16:07:36 INFO KafkaOffsetReaderConsumer: Partitions added: Map()


Solution

  • Structured Streaming is trying to checkpoint to the properly set "checkpoint" directory but seems like the job itself has no right to do that. If checkpoint is not working then it's totally valid that the workload is stopped and not progressing.

    Please double check why Hadoop FS is not able to be written and re-run the workload. Just a hint, maybe one can use full path when setting "checkpointLocation" to be more explicit which directory to use.