all big data expert,
I encountered a problem while sending aggregated results to Kafka Topic. It works fine with transformation without aggregation. Can anyone help me to resolve this? The aggregated result is important to trigger subsequent events and different logic. Here is the simulated problem. All the code below is tested and is working.
Spark version 2.4.4
Kafka Plugin org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
#dummy publisher
CLUSTER_NAME=$(/usr/share/google/get_metadata_value attributes/dataproc-cluster-name)
for i in {0..10000}; do echo "{\"name\":\"${i}\", \"dt\":$(date +%s)}"; sleep 1; done | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list ${CLUSTER_NAME}-w-1:9092 --topic test_input
> {"name":"3433", "dt":1580282788}
> {"name":"3434", "dt":1580282789}
> {"name":"3435", "dt":1580282790}
> {"name":"3436", "dt":1580282791}
import time
from pyspark.sql.types import *
from pyspark.sql.functions import *
table='test_input'
wallet_txn_log = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("subscribe", table) \
.load() \
.selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)]) ).alias("x")).select('x.*')\
.select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
.select([to_json(struct('name','txn_datetime')).alias("value")]) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("topic", "test_output_non_aggregate") \
.option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-{}".format(table)).start()
Output it works as expected
/usr/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${CLUSTER_NAME}-w-1:9092 --topic test_output_non_aggregate
{"name":"2844","txn_datetime":"2020-01-29T15:16:36.000+08:00"}
{"name":"2845","txn_datetime":"2020-01-29T15:16:37.000+08:00"}
I tried the watermark and without watermarking, both not working
table='test_input'
wallet_txn_log = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("subscribe", table) \
.load() \
.selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)]) ).alias("x")).select('x.*')\
.select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
.withWatermark("txn_datetime", "5 seconds") \
.groupBy('name','txn_datetime').agg(
count("name").alias("is_txn_count")) \
.select([to_json(struct('name','is_txn_count')).alias("value")]) \
.writeStream \
.format("kafka") \
.outputMode("update") \
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
.option("topic", "test_aggregated_output") \
.option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-aggregated_{}".format(table)).start()
Error
[Stage 1:> (0 + 3) / 200]20/01/29 16:20:57 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, cep-m.asia-southeast1-c.c.tngd-poc.internal, executor 1): org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
yarn log -applicationId xxx
Query Validation
The group By aggregation query is correct. It works in Console and Memory Sink. However, in Kafka Sink, it keeps throwing error.
wallet_txn_log = spark \
... .readStream \
... .format("kafka") \
... .option("kafka.bootstrap.servers", "10.148.15.235:9092,10.148.15.236:9092,10.148.15.233:9092") \
... .option("subscribe", table) \
... .load() \
... .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)]) ).alias("x")).select('x.*')\
... .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
... .withWatermark("txn_datetime", "5 seconds") \
... .groupBy('name','txn_datetime').agg(
... count("name").alias("is_txn_count")) \
... .select([to_json(struct('name','is_txn_count')).alias("value")])
>>>
>>> df=wallet_txn_log.writeStream \
... .outputMode("update") \
... .option("truncate", False) \
... .format("console") \
... .start()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------------------+
|value |
+--------------------------------+
|{"name":"4296","is_txn_count":1}|
|{"name":"4300","is_txn_count":1}|
|{"name":"4297","is_txn_count":1}|
|{"name":"4303","is_txn_count":1}|
|{"name":"4299","is_txn_count":1}|
|{"name":"4305","is_txn_count":1}|
|{"name":"4298","is_txn_count":1}|
|{"name":"4304","is_txn_count":1}|
|{"name":"4307","is_txn_count":1}|
|{"name":"4302","is_txn_count":1}|
|{"name":"4301","is_txn_count":1}|
|{"name":"4306","is_txn_count":1}|
|{"name":"4310","is_txn_count":1}|
|{"name":"4309","is_txn_count":1}|
|{"name":"4308","is_txn_count":1}|
+--------------------------------+
The group by aggregation code is correct, just as the Kafka Plugin for spark 2.4.4 has a minor bug in this case. After downgrading the spark from 2.4.4 to 2.4.3. The error above is gone.