Search code examples
pythonapache-kafkakafka-producer-apiconfluent-platform

JSON column as Key in kafka producer


As we know , we can send a key with kafka producer which is hashed internally to find which partition in topic data goes to. I have a producer , where in I am sending a data in JSON format.

kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF 
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF

Now I want to use "countrycode" as my key , while sending data. In Normal delimited data we can specify 2 parameters :

--property "parse.key=true" 
--property "key.separator=:

But How to do it when sending JSON sata.

I am using confluent's python API for Kafka if there is any thing that I have to write in terms of classed of functions to achieve this, i would be thankful if you can say it in terms of python.


Solution

  • JSON is just a string. The console producer doesn't parse JSON, only the Avro console producer does.

    I would avoid key.separator=: since JSON contains :. You could use | character instead, then you just type out

    countrycode|{"your":"data"}
    

    In Python, the produce function takes a key, yes. You can parse your data like this in order to extract a value to the key.

    key = 'countrycode'
    records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
               {"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
    ]
    
    import json
    for r in records:
        producer.produce('topic', key=r[key], value=json.dumps(r))
        # first record will send a record containing ('IN', {  ... 'countrycode':'IN'})