Search code examples
apache-kafkakafka-consumer-apikafka-producer-api

Json column as key in kafka producer and push in different partitions on the basis of key


As we know , we can send a key with kafka producer which is hashed internally to find which partition in topic data goes to. I have a producer , where in I am sending a data in JSON format.

[
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 07:50:42",
    "TIME": 75042,
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:02:26",
    "TIME": 80226
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:39:55",
    "TIME": 83955
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:43:26",
    "TIME": 84326
},
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:44:22",
    "TIME": 84422
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:45:09",
    "TIME": 84509
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
          },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
  }
]

I want to push data in topic but different partition on the basis of key(DEVICEID). I have created topic with two partitions 0 &1. But it storesd all the data in partition-0. I want all unique key(deviceID) will store in different partition. Code:

object Producer extends App{
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
    val producer = new KafkaProducer[String,JsonNode](props)
    println("inside prducer")
    val mapper = (new ObjectMapper() with ScalaObjectMapper).
        registerModule(DefaultScalaModule).
        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
        findAndRegisterModules(). // register joda and java-time modules automatically
        asInstanceOf[ObjectMapper with ScalaObjectMapper] 
     val filename = "/Users/rishunigam/Documents/devicd.json"
     val jsonNode: JsonNode=  mapper.readTree(new File(filename))
     val s = jsonNode.size()
     for(i <- 0 to jsonNode.size()-1) {
     val js = jsonNode.get(i)
       val keys = jsonNode.get(i).findValue("DEVICEID").toString
       println(keys)
       println(js)
     val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
   println(record)
  producer.send(record)
}
    println("producer complete")
    producer.close()
}

Solution

  • it stored all the data in partition-0

    That doesn't mean it's not working. Just means that the hashes of the keys ended up in the same partition.

    If you want to override the default partitioner, you need to define your own Partitioner class to parse the message and assign the appropriate partition, then set partitioner.class in the Producer properties

    I want all unique key(deviceID) will store in different partition

    Then you would have to know your compete dataset ahead of time to create N partitions for N devices. And what happens when you add a completely new device?