I have a Structured Streaming pyspark program running on GCP Dataproc, which reads data from Kafka, and does some data massaging, and aggregation. I'm trying to use withWatermark(), and it is giving error.
Here is the code :
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
# readStream calls foreachBatch(convertToDictForEachBatch)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("numRows",10)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
convertToDictForEachBatch - has the code which does data massaging and aggregation
def convertToDictForEachBatch(df, batchId):
# d = df_stream.rdd.collect()
print(" IN CONVERT TO DICT ", batchId, " currentTime ", datetime.datetime.now(), " df -> ", df)
ll = df.rdd.map(lambda x: x[0])
res = []
# each row is parsed, and finally converted to rdd i.e. tdict)
tdict = ll.map(convertToDict)
# converting the tdict to DF, which is passed to Alarm class, where the data massaging & aggregation is done
dfnew = tdict.toDF()
ap = Alarm(tdict, spark)
#Aggregation code in Alarm call, which uses withWatermark
def computeCount(df_processedAlarm, df_totalAlarm):
processedAlarmCnt = None
if df_processedAlarm.count() > 0:
processedAlarmCnt = df_processedAlarm.withWatermark("timestamp", "10 seconds")\
.groupBy(
window(col("timestamp"), "1 minutes").alias("window")
).count()
Objective of the code above is to calculate the count of processedAlarms for a window of 1 min, with watermark of 10 seconds
Error :
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call
raise e
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 444, in convertToDictForEachBatch
ap = Alarm(tdict, spark)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 356, in __init__
computeCount(l_alarm_df, l_alarm1_df)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 262, in computeCount
window(col("timestamp"), "10 minutes").alias("window")
TypeError: 'module' object is not callable
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy33.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Traceback (most recent call last):
What needs to be done to debug/fix this issue ? tia !
As @ewertonvsilva mentioned, this was related to import error. specifically ->
from spark.sql.functions import window
After the import was corrected, the issue was fixed.