Search code examples
scalaapache-kafkaspark-streamingmapr

Attach kafka offset to each record in foreachRDD


I would like to retrieve each kafka offset on each record of my RDD, inside the method foreachRDD. I have one partition in my topic, so my RDD get one partition too. I basicaly try somethind like this:

dStream.foreachRDD { rdd =>
  if (!rdd.isEmpty) {
    //get offset first value of the offset
    val firstOffset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges(0).fromOffset
    val rddWithOffset = rdd.map(_.value)
      .zipWithIndex()
      .map{ case (v,i) => (v,i + firstOffset)}
  }
}

for exemple in my producer I send messages using a loop and I place the index in a column named position like this:

+------+-----+--------+
|  name|  age|position|
+------+-----+--------+
|johnny|   26|       1|
| chloe|   42|       2|
| brian|   19|       3|
| eliot|   35|       4|
+------+-----+--------+

unfortunatly I notice that the order is not maintained when I add the offset column in my consumer:

+------+-----+--------+------+
|  name|  age|position|offset|
+------+-----+--------+------+
|johnny|   26|       1|     1|
| chloe|   42|       2|     3|
| brian|   19|       3|     4|
| eliot|   35|       4|     2|
+------+-----+--------+------+

it seem like I loose the order with this process. Do you have any idea? Thanks

By the way, my Java producer look like this:

KafkaRestProducer<String, Object> producer = new KafkaRestProducer<>(props);

ArrayList<String> names = new ArrayList<String>()
names.add("johnny")
names.add("chloe")
names.add("brian")
names.add("eliot")

ArrayList<Integer> ages = ArrayList<Integer>()
names.add(26)
names.add(42)
names.add(19)
names.add(35)

for (int i = 0; i < 3; ++i) {

    String name = names(i)
    Int age = ages(i)     
    Person person = Person
        .newBuilder()
        .setName(name)
        .setAge(age)
        .setPosition(i)
        .build();

    ProducerRecord<String, Object> record = new ProducerRecord<>("/apps/PERSON/streams:myTopic", name, person);

    producer.send(record, null);
    System.out.println(i);
}

Solution

  • My English is very poor. I use this code:

        val Array(brokers, topic, groupId) = args
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId)
        val topicPartition = Map[TopicAndPartition, Long](TopicAndPartition(topic, 0) -> 1.toLong)
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.offset, mmd.message)
        val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (Long, String)](
            ssc, kafkaParams, topicPartition, messageHandler)
    
        kafkaStream.foreachRDD(rdd => rdd.foreach(println))
    

    Output: (offset, lineOfMessage) ...