Search code examples
apache-sparkpysparkspark-structured-streaming

Spark Structured Streaming Error while sending aggregated result to Kafka Topic


all big data expert,

I encountered a problem while sending aggregated results to Kafka Topic. It works fine with transformation without aggregation. Can anyone help me to resolve this? The aggregated result is important to trigger subsequent events and different logic. Here is the simulated problem. All the code below is tested and is working.

Spark version 2.4.4

Kafka Plugin org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4

Data Source

#dummy publisher
CLUSTER_NAME=$(/usr/share/google/get_metadata_value attributes/dataproc-cluster-name)  

for i in {0..10000}; do echo "{\"name\":\"${i}\", \"dt\":$(date +%s)}";  sleep 1; done | /usr/lib/kafka/bin/kafka-console-producer.sh     --broker-list ${CLUSTER_NAME}-w-1:9092  --topic test_input

> {"name":"3433", "dt":1580282788} 
> {"name":"3434", "dt":1580282789}
> {"name":"3435", "dt":1580282790} 
> {"name":"3436", "dt":1580282791}

Transformation(Without group by aggregation)

import time
from pyspark.sql.types import *
from pyspark.sql.functions import *

table='test_input'
wallet_txn_log = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("subscribe", table) \
    .load() \
    .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema=   StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)])    ).alias("x")).select('x.*')\
    .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
    .select([to_json(struct('name','txn_datetime')).alias("value")]) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("topic", "test_output_non_aggregate") \
    .option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-{}".format(table)).start()

Output it works as expected

/usr/lib/kafka/bin/kafka-console-consumer.sh     --bootstrap-server ${CLUSTER_NAME}-w-1:9092 --topic test_output_non_aggregate

{"name":"2844","txn_datetime":"2020-01-29T15:16:36.000+08:00"}
{"name":"2845","txn_datetime":"2020-01-29T15:16:37.000+08:00"}

Group by aggregation

I tried the watermark and without watermarking, both not working

table='test_input'
wallet_txn_log = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("subscribe", table) \
    .load() \
    .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)])  ).alias("x")).select('x.*')\
    .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
    .withWatermark("txn_datetime", "5 seconds") \
    .groupBy('name','txn_datetime').agg( 
     count("name").alias("is_txn_count")) \
    .select([to_json(struct('name','is_txn_count')).alias("value")]) \
    .writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092") \
    .option("topic", "test_aggregated_output") \
    .option("checkpointLocation", "gs://gcp-datawarehouse/streaming/checkpoints/streaming_test1-aggregated_{}".format(table)).start()

Error

[Stage 1:>                                                        (0 + 3) / 200]20/01/29 16:20:57 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, cep-m.asia-southeast1-c.c.tngd-poc.internal, executor 1): org.apache.spark.util.TaskCompletionListenerException: null
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

yarn log -applicationId xxx

Link to yarn log

Query Validation

The group By aggregation query is correct. It works in Console and Memory Sink. However, in Kafka Sink, it keeps throwing error.

wallet_txn_log = spark \
...     .readStream \
...     .format("kafka") \
...     .option("kafka.bootstrap.servers", "10.148.15.235:9092,10.148.15.236:9092,10.148.15.233:9092") \
...     .option("subscribe", table) \
...     .load() \
...     .selectExpr("CAST(value AS STRING) as string").select( from_json("string", schema= StructType([StructField("dt",LongType(),True),StructField("name",StringType(),True)])  ).alias("x")).select('x.*')\
...     .select(['name',col('dt').cast(TimestampType()).alias("txn_datetime")]) \
...     .withWatermark("txn_datetime", "5 seconds") \
...     .groupBy('name','txn_datetime').agg( 
...      count("name").alias("is_txn_count")) \
...     .select([to_json(struct('name','is_txn_count')).alias("value")]) 
>>> 
>>> df=wallet_txn_log.writeStream \
...     .outputMode("update") \
...     .option("truncate", False) \
...     .format("console") \
...     .start()
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+--------------------------------+
|value                           |
+--------------------------------+
|{"name":"4296","is_txn_count":1}|
|{"name":"4300","is_txn_count":1}|
|{"name":"4297","is_txn_count":1}|
|{"name":"4303","is_txn_count":1}|
|{"name":"4299","is_txn_count":1}|
|{"name":"4305","is_txn_count":1}|
|{"name":"4298","is_txn_count":1}|
|{"name":"4304","is_txn_count":1}|
|{"name":"4307","is_txn_count":1}|
|{"name":"4302","is_txn_count":1}|
|{"name":"4301","is_txn_count":1}|
|{"name":"4306","is_txn_count":1}|
|{"name":"4310","is_txn_count":1}|
|{"name":"4309","is_txn_count":1}|
|{"name":"4308","is_txn_count":1}|
+--------------------------------+

Solution

  • The group by aggregation code is correct, just as the Kafka Plugin for spark 2.4.4 has a minor bug in this case. After downgrading the spark from 2.4.4 to 2.4.3. The error above is gone.