I have java Kafka stream processing application and python application. Java application produces data and python consumer consumes it. When the processing.guarantee
is set to exactly_once
, then the python consumer is not able to deserialize the data. Deserialization fails.
I tried a java consumer and the java consumer is successfully reading the data. Then I turned back the processing.guarantee
to atleast_once
in java application. Now the python application is able to read without any issue.
I checked the payload from a console consumer and in both cases of exactly_once
and atleast_once
the payload looks same. Even the binary payload read at python consumer before deserializtion in both cases looks same. What could be the problem in this scenario.
Note: In my case the kafka doesn't have atleast 3 brokers which is suggested in the documentation for exactly_once
to work. Its only one in my setup.
Can anyone throw some light into why java consumer was working but not python consumer.
Update: Looking at the python logs much deeper looks like two records are being tried to process in python consumer
b'\x00\x00\x00\x01'
and value = b'\x00\x00\x00\x00\x00\x00'
. But now Iam wondering how this additional record is send when exactly_once
is set.Below is the python code used.
params = {
"bootstrap_servers": "localhost:29092",
"auto_offset_reset": "latest",
"group_id": "test",
}
def set_consumer(self):
try:
consumer = KafkaConsumer(*self.topics, **self.consumer_params)
return consumer
Exception e:
print(e)
for msg in self.consumer:
try:
event = self.decode_msg(msg)
self.logger.info("Json result : %s", str(event))
wondering how this additional record is send when exactly_once is set
It is a transaction marker. The Java consumer is able to detect these and filter them out, but in Python, your deserializer will need to handle them separately. There is a Github issue thread that suggests the consumer should already be able to filter the transaction records; maybe check the librdkafka docs if you are missing any configurations for this.
I see there is an EOS example in the confluent-kafka-python repo, but it doesn't consume after the producer sends the transaction records.