Search code examples

Spark Streaming kafka offset manage

I had been doing spark streaming jobs which consumer and produce data through kafka. I used directDstream,so I had to manage offset by myself,we adopted redis to write and read offsets.Now there is one problem,when I launched my client,my client need to get the offset from redis,not offset which exists in kafka show I write my code?Now I had written my code below:

   kafka_stream = KafkaUtils.createDirectStream(
    topics=[config.CONSUME_TOPIC, ],
    kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                 "auto.offset.reset": "largest"},

But I think the fromOffsets is the value(from redis) when the spark-streaming client lauched,not during its running.thank you for helpinp.


  • If I understand you correctly you need to set your offset manually. This is how I do it:

    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from pyspark.streaming.kafka import TopicAndPartition
    stream = StreamingContext(sc, 120) # 120 second window
    kafkaParams = {"":"1:667,2:6667,3:6667"}
    kafkaParams["auto.offset.reset"] = "smallest"
    kafkaParams[""] = "false"
    topic = "xyz"
    topicPartion = TopicAndPartition(topic, 0)
    fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}
    kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)