I have a Kafka Stream App that uses two state-stores. I have a problem when running this app on Openshift on a Strimzi Cluster (kafka:0.29.0-kafka-3.1.0).
It means that when I receive a bp-addr record it succesfully fetches the record from a bp-state-store but returns always null from person-state-store.
It also returns null when person-addr record comes to the stream and it tried to retrieve entries from bp-state-store.
Tried to use a .get()
method with an exact key and a .prefixScan()
. For both cases I get this weird behaviour.
I have double checked the keys, entries, state store status and they look fine. What's more, my unit tests are working correctly as well as a on local docker kafka cluster (cp-kafka).
I tried check the ACLs rights on the strimzi cluster and they look also correct (added my user as a super-user to check it). Two various openshift namespaces built from the scratch.
Would anyone have an ideas/hints what could I check?
private StreamsBuilder buildTopology(KafkaStreamsProperty kafkaStreamsProperty,
SpecificAvroSerde<JoinedPersonAddrV2> joinedPASerde,
SpecificAvroSerde<JoinedBpAddrV2> joinedBASerde,
SpecificAvroSerde<ObjectUpdateEvent> updateSerde
) {
StreamsBuilder builder = new StreamsBuilder();
final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaStreamsProperty.getSchemaRegistryUrl());
joinedBASerde.configure(serdeConfig, false);
joinedPASerde.configure(serdeConfig, false);
updateSerde.configure(serdeConfig, false);
KStream<String, JoinedPersonAddrV2> personAddrKStream = builder.stream(kafkaStreamsProperty.getPersonInputTopic(), Consumed.with(Serdes.String(), joinedPASerde));
KStream<String, JoinedBpAddrV2> bpAddrKStream = builder.stream(kafkaStreamsProperty.getBpInputTopic(), Consumed.with(Serdes.String(), joinedBASerde));
StoreBuilder<KeyValueStore<String, JoinedBpAddrV2>> bpStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(BP_STORE_NAME),
Serdes.String(),
joinedBASerde
);
StoreBuilder<KeyValueStore<String, JoinedPersonAddrV2>> personStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(PERSON_STORE_NAME),
Serdes.String(),
joinedPASerde
);
StoreBuilder<KeyValueStore<String, Long>> debounceStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(DEBOUNCE_STORE),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(bpStoreBuilder);
builder.addStateStore(personStoreBuilder);
builder.addStateStore(debounceStoreBuilder);
KStream<String, BpAddrOrPersonAddrV2> mergedStream = bpAddrKStream
.mapValues(value -> new BpAddrOrPersonAddrV2(null, value))
.merge(personAddrKStream.mapValues(value -> new BpAddrOrPersonAddrV2(value, null)));
mergedStream.process(() -> new ContextualProcessor<String, BpAddrOrPersonAddrV2, String, ObjectUpdateEvent>() {
@Override
public void process(Record<String, BpAddrOrPersonAddrV2> record) {
// assign value to a variable
BpAddrOrPersonAddrV2 eitherValue = record.value();
boolean isHashUpdated = false;
// get state stores
KeyValueStore<String, JoinedBpAddrV2> bpAddrStateStore = context().getStateStore(BP_STORE_NAME);
KeyValueStore<String, JoinedPersonAddrV2> personAddrStateStore = context().getStateStore(PERSON_STORE_NAME);
// assign single Values
JoinedBpAddrV2 bpAddr = eitherValue.getBpAddr();
JoinedPersonAddrV2 personAddr = eitherValue.getPersonAddr();
// process if bp-addr record
if (bpAddr != null) {
String[] splittedKey = record.key().split(":");
String personIdAddrIdKey = String.format("%s:%s", splittedKey[1], splittedKey[2]);
boolean isEmittent = isEmittent(bpAddr);
if (isRegisteredOwnerId(bpAddr, splittedKey[1]) || isEmittent) {
// personId, addrId, bpId
String revertedKey = String.format("%s:%s:%s", splittedKey[1], splittedKey[2], splittedKey[0]);
// get stored value from bp-addr state store
JoinedBpAddrV2 storedValue = bpAddrStateStore.get(revertedKey);
log.info("bpaddr entry storedValue: {}", storedValue);
if (storedValue == null) {
log.debug("{}, New BpAddr Table Entry for: {}, hash: {}", BP_STORE_NAME, record.key(), bpAddr.getHash());
bpAddrStateStore.put(revertedKey, bpAddr);
isHashUpdated = true;
} else if (!Objects.equals(storedValue.getHash(), bpAddr.getHash())) {
log.debug("{}, Update BpAddr Entry for key: {}, old hash: {}, new hash: {}", BP_STORE_NAME, record.key(), storedValue.getHash(), bpAddr.getHash());
bpAddrStateStore.put(revertedKey, bpAddr);
isHashUpdated = true;
} else {
log.debug("{}, No hash change do nothing for BpAddr key:{} hash value:{}", BP_STORE_NAME, record.key(), storedValue.getHash());
}
}
if (isHashUpdated) {
log.info("SEARCHING IN PERSON - ADDR STATE STORE WITH A KEY: {}", personIdAddrIdKey);
JoinedPersonAddrV2 matching = personAddrStateStore.get(personIdAddrIdKey);
log.info("MATCHING: {}", matching);
KeyValueIterator<String, JoinedPersonAddrV2> matchedPersonAddrIterator = personAddrStateStore.prefixScan(personIdAddrIdKey, new StringSerializer());
if (matchedPersonAddrIterator.hasNext()) {
while (matchedPersonAddrIterator.hasNext()) {
JoinedPersonAddrV2 matchedPersonAddr = matchedPersonAddrIterator.next().value;
// if the bp-addr is an emmitent and didn't find mathing personAddr look with just personId using prefix scan
// because addrId is not propagated on bp, therefore they might have a different domiAddr on bp and person
if (matchedPersonAddr == null && isEmittent) {
String personId = splittedKey[1];
// do a prefix scan for a personId
KeyValueIterator<String, JoinedPersonAddrV2> joinedPersonIterator = personAddrStateStore.prefixScan(personId, new StringSerializer());
while (joinedPersonIterator.hasNext()) {
JoinedPersonAddrV2 item = joinedPersonIterator.next().value;
context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(item, bpAddr)));
}
// "standard" match forward the record
} else if (matchedPersonAddr != null) {
context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(matchedPersonAddr, bpAddr)));
}
}
}
}
} else if (personAddr != null) { // process if person-addr record is coming
if (record.value() != null) {
JoinedPersonAddrV2 storedValue = personAddrStateStore.get(record.key());
log.info("personaddr entry storedValue: {}", storedValue);
if (storedValue == null) {
log.debug("{}, New PersonAddr Table entry for: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
personAddrStateStore.put(record.key(), personAddr);
isHashUpdated = true;
} else if (!storedValue.getHash().equals(personAddr.getHash())) {
log.debug("{}, Update PersonAddr Table entry for: {}, old hash: {}, new hash: {}", PERSON_STORE_NAME, record.key(), storedValue.getHash(), personAddr.getHash());
personAddrStateStore.put(record.key(), personAddr);
isHashUpdated = true;
} else {
log.debug("{}, No hash change do nothing for PersonAddr key: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
}
} else {
log.debug("Got NULL JoinedPersonAddr Object - skip! --> key: {}", record.key());
}
if (isHashUpdated) {
try {
// look for matches in bpAddr store
log.info("SEARCHING IN BP - ADDR STATE STORE WITH A KEY: {}", record.key());
KeyValueIterator<String, JoinedBpAddrV2> bpAddrIterator = bpAddrStateStore.prefixScan(record.key(), new StringSerializer());
if (bpAddrIterator.hasNext()) {
log.trace("Found records for key {} record in {}", record.key(), BP_STORE_NAME);
// got matches for given personId:addrId update all found records
while (bpAddrIterator.hasNext()) {
JoinedBpAddrV2 bpAddrObject = bpAddrIterator.next().value;
context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrObject)));
}
} else {
log.trace("Found NO records for key {} record in {}", record.key(), BP_STORE_NAME);
String[] splittedKey = record.key().split(":");
final String ZERO = "0";
// only when addr != 0 and is emittent
if (personAddr.getIsEmittent() && !splittedKey[1].equals(ZERO)) {
String personIdNullAddrKey = String.format("%s:%s", splittedKey[0], ZERO);
// look for records in bp-addr state store with key personId:0
KeyValueIterator<String, JoinedBpAddrV2> bpNullAddrIterator = bpAddrStateStore.prefixScan(personIdNullAddrKey, new StringSerializer());
if (bpNullAddrIterator.hasNext()) {
log.trace("Found records for key {} record in {}", personIdNullAddrKey, BP_STORE_NAME);
while (bpNullAddrIterator.hasNext()) {
JoinedBpAddrV2 bpAddrV2 = bpNullAddrIterator.next().value;
context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrV2)));
}
}
} else {
log.trace("Found NO matched sending PERSON_ONLY records");
// no match in bpAddr state store send PERSON_ONLY event
context().forward(record.withValue(createObjectUpdateEvent(personAddr, null)));
}
}
} catch (Exception e) {
log.error("EXCEPTION : {}", e.getMessage());
}
}
}
}
}, BP_STORE_NAME, PERSON_STORE_NAME)
.process(() -> new DebounceTransformer<>(DEBOUNCE_STORE, 5000), DEBOUNCE_STORE)
.peek((key, value) -> log.debug("Produced Update Event key: {}, hash: {}", key, value.getHash()))
.to(kafkaStreamsProperty.getOutputTopic(), Produced.with(Serdes.String(), updateSerde));
return builder;
}
Topology looks as follows:
Topology: Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [person.addr.join.topic])
--> KSTREAM-MAPVALUES-0000000003
Source: KSTREAM-SOURCE-0000000001 (topics: [bp.addr.join.topic])
--> KSTREAM-MAPVALUES-0000000002
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-MERGE-0000000004
<-- KSTREAM-SOURCE-0000000001
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-MERGE-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MERGE-0000000004 (stores: [])
--> KSTREAM-PROCESSOR-0000000005
<-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
Processor: KSTREAM-PROCESSOR-0000000005 (stores: [bp-state-store, person-state-store])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-MERGE-0000000004
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [debounce-store])
--> KSTREAM-PEEK-0000000007
<-- KSTREAM-PROCESSOR-0000000005
Processor: KSTREAM-PEEK-0000000007 (stores: [])
--> KSTREAM-SINK-0000000008
<-- KSTREAM-PROCESSOR-0000000006
Sink: KSTREAM-SINK-0000000008 (topic: update.event.topic)
<-- KSTREAM-PEEK-0000000007
The "problem" was the the strimzi cluster I used had a partitioned Topics. Situation: if you have a Topic A and B with 10 partitions with + produces that sends messages to theses topic with a default Partitioner it might be that a BpAddr record and PersonAddr record land in a different partition.
It means that an entry in the state store(which is also partitioned) afterwards is also made in a different partitions --> Therefore when an update records comes over and app want to check entries in a state-store, we must make sure that the PersonAddr and BpAddr always land in the same partition.
Solved it by writing a custom Partitioner for a producer that writes PersonAddr and BpAddr records to make sure they will be in the same partition of the topics.