Search code examples
apache-sparkapache-kafkaspark-streamingoffsetspark-streaming-kafka

Spark Streaming kafka offset manage


I had been doing spark streaming jobs which consumer and produce data through kafka. I used directDstream,so I had to manage offset by myself,we adopted redis to write and read offsets.Now there is one problem,when I launched my client,my client need to get the offset from redis,not offset which exists in kafka itself.how show I write my code?Now I had written my code below:

   kafka_stream = KafkaUtils.createDirectStream(
    ssc,
    topics=[config.CONSUME_TOPIC, ],
    kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                 "auto.offset.reset": "largest"},
    fromOffsets=read_offset_range(config.OFFSET_KEY))

But I think the fromOffsets is the value(from redis) when the spark-streaming client lauched,not during its running.thank you for helpinp.


Solution

  • If I understand you correctly you need to set your offset manually. This is how I do it:

    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from pyspark.streaming.kafka import TopicAndPartition
    
    stream = StreamingContext(sc, 120) # 120 second window
    
    kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
    kafkaParams["auto.offset.reset"] = "smallest"
    kafkaParams["enable.auto.commit"] = "false"
    
    topic = "xyz"
    topicPartion = TopicAndPartition(topic, 0)
    fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}
    
    kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)