Search code examples
javaapache-kafkaapache-kafka-streams

stateStore.delete(key) in Kafka is not working


I have what it thought would be a simple statestore use case. We loop through a state store every 10s and try to send to a partner, if we receive 404, we try again next intervall. If we receive 200, we delete the entry from the state store.

In my test (1 entry in statestore) I first let it run a few loops, where we receive 404, just to test that the retry works. When I switch my mock endpoint to return 200, I can see through the logs that both: stateStore.delete(key) and stateStore.flush() is called. I even confirm after stateStore.delete(key) that stateStore.get(key) returns a null value (tombstone).

However, the next time the punctuator runs (10s), the object is still in the state store and the entire block is called again. it keeps looping like this, without ever deleting the entry in the statestore

@Override
public void punctuate(long l) {
    log.info("PeriodicRetryPunctuator started: " + l);

    try(KeyValueIterator<String, TestEventObject> iter = stateStore.all()) {
        while(iter.hasNext()) {
            KeyValue<String, TestEventObject> keyValue = iter.next();
            String key = keyValue.key;
            TestEventObject event = keyValue.value;
            
            try {
                log.info("Event: " + event);
                // Sends event over HTTP. Will throw HttpResponseException if 404 is received
                eventService.processEvent(event);
                
                stateStore.delete(key);
                stateStore.flush();
                
                // Check that statestore returns null
                log.info("Check: " + stateStore.get(key));
            } catch (HttpResponseException hre) {
                log.info("Periodic retry received 404. Retrying at next interval");
            }
            catch (Exception e) {
                e.printStackTrace();
                log.error("Exception with periodic retry: {}", e.getMessage());
            }
        }
    }
}

Solution

  • Update:

    It seems to be Confluent's encryption libraries that causes these issues. I've done quite an extensive A/B test, and every time it occurs is with Confluent encryption. Without I never experience this issue.