Search code examples
pysparkapache-kafkaspark-structured-streaming

How to load all records that were already published from Kafka?


I have a pyspark structure streaming python app up set up like this

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

And All that shows up is this

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
  "runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
  "name" : null,
  "timestamp" : "2019-03-04T22:00:49.389Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 852,
    "getBatch" : 180,
    "getOffset" : 135,
    "queryPlanning" : 107,
    "triggerExecution" : 1321,
    "walCommit" : 27
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[my_topic]]",
    "startOffset" : null,
    "endOffset" : {
      "my_topic" : {
        "0" : 303
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
  }
}

As you can see, my_topic has 303 messages there but I cant get it to show. Additional information includes that I am using the confluent Kafka JDBC connector to query an oracle database and store the rows into the kafka topic. I have an avro schema registry setup with that. If required, I will share these property files as well.

Does anyone have any idea what's going on?


Solution

  • As a streaming application, this Spark Structure streaming only reads messages as soon as they are publish. What I wanted to do, for testing purposes was read everything in the topic. In order to do that, all you have to do is an an extra option in readStream, i.e. option("startingOffsets", "earliest").

    data_raw = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", "kafkahost:9092")\
        .option("subscribe", "my_topic")\
        .option("startingOffsets", "earliest")
        .load()