I have spark streaming application which runs End of day and consumes kafka events send by the upstream application.Currently upstream application keeps pushing new data the whole day and my consumer ends up consuming it. I want to limit the events consumed based on a cutoff say 6pm daily.Is there a way to specify a cut off to limit the events consumed based on a cutoff say kafka event timestamp or something .Below is the consumer code
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
You can just filter out the events during processing on the basis of timeStamp or time or any field. For example lets suppose your event is JSON and it have field called hour which is event time hour value. You can easily choose only event which got created before 6 like below.
directStream.foreachRDD { rdd =>
val eventDfRDD = rdd.filter(record => {
val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
option.get("hour") < 1800
})
}