Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streaming

Pyspark - Kafka integration works for batches but not for readStream


hope to find you well :)

I've been scratching my head the last couple of days to try and make sense of this... Basically, I am trying to read a stream from a kafka producer in pyspark (nothing new here). However there is something funky going on, because the readStream method simply doesn't want to cooperate, but I can read the topic just fine (which means there is no connectivity or missing dependencies issue).

Here is my code for the streaming setup:

from pyspark.sql import SparkSession
import logging
logging.basicConfig(level=logging.INFO)


spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .appName("KafkaStreams") \
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO")


# Read stream

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9094") \
    .option("subscribe", "TEST_TOPIC") \
    .option("startingOffsets", "earliest") \
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .load()

ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

ds.writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

And here is the set up reading from the logs: (note: I am reusing the same spark initialization)

df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9094") \
    .option("subscribe", "TEST_TOPIC") \
    .option("startingOffsets", "earliest") \
    .load()

df.select('timestamp').filter(to_timestamp(df.timestamp, 'HH:mm:ss') > '09:30:00').show()

As I said, using read I am able to get the expected output, but when I launch the read-writeStream the code simply exists with code 0.

Here is a stack of the logs i get:

23/06/18 15:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/18 15:04:26 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/06/18 15:04:26 INFO SharedState: Warehouse path is 'file:/home/ettore/Documents/Portfolio/Spark%20Streams/spark-warehouse'.
23/06/18 15:04:27 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
23/06/18 15:04:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/06/18 15:04:27 INFO ResolveWriteToStream: Checkpoint root /tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe resolved to file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe.
23/06/18 15:04:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/06/18 15:04:27 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/metadata using temp file file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/.metadata.16db89ea-8365-4111-aa6d-43debb423f95.tmp
23/06/18 15:04:27 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/.metadata.16db89ea-8365-4111-aa6d-43debb423f95.tmp to file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/metadata
23/06/18 15:04:28 INFO MicroBatchExecution: Starting [id = daa91f2b-8899-499d-ab1f-ff196563153b, runId = 999e4e36-7c95-4678-ac51-fe4b00a77e78]. Use file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe to store the query checkpoint.
23/06/18 15:04:28 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@10ba27c7] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@7080ff7d]

Process finished with exit code 0

As you can see, I've tried to set up a delay to process the data, but to no use unfortunately. Plus, I don't think the warning Unable to load native-hadoop library is really what is causing is again given that in batches it works.

If you have any idea of what might be going on or how to trouble shoot it it would be much appreciated.

Thank you in advance and have a nice day :)


Solution

  • You need to wait for stream to run - when you start it, it runs in the background. Usually you need to use the .awaitTermination() function on result returned by .start():

    query = ds.writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("update") \
        .format("console") \
        .option("truncate", "false") \
        .start()
    
    # wait until stream finishes
    query.awaitTermination()
    

    Spark documentation describes it's in the first example section.