As we know , we can send a key with kafka producer which is hashed internally to find which partition in topic data goes to. I have a producer , where in I am sending a data in JSON format.
[
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 07:50:42",
"TIME": 75042,
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:02:26",
"TIME": 80226
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:39:55",
"TIME": 83955
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:43:26",
"TIME": 84326
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:44:22",
"TIME": 84422
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:45:09",
"TIME": 84509
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99:9e:dc",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
},
{
"DATE": 20200723,
"SOURCETYPE": "WIFI",
"DEVICEID": "24:6f:28:99",
"EVENTTIME": "2020-07-23 08:45:58",
"TIME": 84558
}
]
I want to push data in topic but different partition on the basis of key(DEVICEID). I have created topic with two partitions 0 &1. But it storesd all the data in partition-0. I want all unique key(deviceID) will store in different partition. Code:
object Producer extends App{
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
val producer = new KafkaProducer[String,JsonNode](props)
println("inside prducer")
val mapper = (new ObjectMapper() with ScalaObjectMapper).
registerModule(DefaultScalaModule).
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
findAndRegisterModules(). // register joda and java-time modules automatically
asInstanceOf[ObjectMapper with ScalaObjectMapper]
val filename = "/Users/rishunigam/Documents/devicd.json"
val jsonNode: JsonNode= mapper.readTree(new File(filename))
val s = jsonNode.size()
for(i <- 0 to jsonNode.size()-1) {
val js = jsonNode.get(i)
val keys = jsonNode.get(i).findValue("DEVICEID").toString
println(keys)
println(js)
val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
println(record)
producer.send(record)
}
println("producer complete")
producer.close()
}
it stored all the data in partition-0
That doesn't mean it's not working. Just means that the hashes of the keys ended up in the same partition.
If you want to override the default partitioner, you need to define your own Partitioner class to parse the message and assign the appropriate partition, then set partitioner.class
in the Producer properties
I want all unique key(deviceID) will store in different partition
Then you would have to know your compete dataset ahead of time to create N partitions for N devices. And what happens when you add a completely new device?