Search code examples
pythonapache-kafkaconfluent-kafka-python

Setting processing.guarentee to exactly_once creates deserialization error at python consumer kafka


I have java Kafka stream processing application and python application. Java application produces data and python consumer consumes it. When the processing.guarantee is set to exactly_once, then the python consumer is not able to deserialize the data. Deserialization fails.

I tried a java consumer and the java consumer is successfully reading the data. Then I turned back the processing.guarantee to atleast_once in java application. Now the python application is able to read without any issue.

I checked the payload from a console consumer and in both cases of exactly_once and atleast_once the payload looks same. Even the binary payload read at python consumer before deserializtion in both cases looks same. What could be the problem in this scenario.

Note: In my case the kafka doesn't have atleast 3 brokers which is suggested in the documentation for exactly_once to work. Its only one in my setup.

Can anyone throw some light into why java consumer was working but not python consumer.

Update: Looking at the python logs much deeper looks like two records are being tried to process in python consumer

  1. Original record - which is processed perfectly fine.
  2. An empty record - log shows as follows key = b'\x00\x00\x00\x01' and value = b'\x00\x00\x00\x00\x00\x00'. But now Iam wondering how this additional record is send when exactly_once is set.

Below is the python code used.

params = {
     "bootstrap_servers": "localhost:29092",
     "auto_offset_reset": "latest",
     "group_id": "test",
}

def set_consumer(self):
    try:
        consumer = KafkaConsumer(*self.topics, **self.consumer_params)
        return consumer
    Exception e:
       print(e)

for msg in self.consumer:
    try:
        event = self.decode_msg(msg)
        self.logger.info("Json result : %s", str(event))


Solution

  • wondering how this additional record is send when exactly_once is set

    It is a transaction marker. The Java consumer is able to detect these and filter them out, but in Python, your deserializer will need to handle them separately. There is a Github issue thread that suggests the consumer should already be able to filter the transaction records; maybe check the librdkafka docs if you are missing any configurations for this.

    I see there is an EOS example in the confluent-kafka-python repo, but it doesn't consume after the producer sends the transaction records.