Search code examples
apache-kafkaapache-kafka-streamsstrimzi

Kafka Streams - cannot search the state-store


I have a Kafka Stream App that uses two state-stores. I have a problem when running this app on Openshift on a Strimzi Cluster (kafka:0.29.0-kafka-3.1.0).

It means that when I receive a bp-addr record it succesfully fetches the record from a bp-state-store but returns always null from person-state-store.

It also returns null when person-addr record comes to the stream and it tried to retrieve entries from bp-state-store.

Tried to use a .get() method with an exact key and a .prefixScan(). For both cases I get this weird behaviour.

I have double checked the keys, entries, state store status and they look fine. What's more, my unit tests are working correctly as well as a on local docker kafka cluster (cp-kafka).

I tried check the ACLs rights on the strimzi cluster and they look also correct (added my user as a super-user to check it). Two various openshift namespaces built from the scratch.

Would anyone have an ideas/hints what could I check?

    private StreamsBuilder buildTopology(KafkaStreamsProperty kafkaStreamsProperty,
                                         SpecificAvroSerde<JoinedPersonAddrV2> joinedPASerde,
                                         SpecificAvroSerde<JoinedBpAddrV2> joinedBASerde,
                                         SpecificAvroSerde<ObjectUpdateEvent> updateSerde
    ) {
        StreamsBuilder builder = new StreamsBuilder();

        final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaStreamsProperty.getSchemaRegistryUrl());

        joinedBASerde.configure(serdeConfig, false);
        joinedPASerde.configure(serdeConfig, false);
        updateSerde.configure(serdeConfig, false);

        KStream<String, JoinedPersonAddrV2> personAddrKStream = builder.stream(kafkaStreamsProperty.getPersonInputTopic(), Consumed.with(Serdes.String(), joinedPASerde));
        KStream<String, JoinedBpAddrV2> bpAddrKStream = builder.stream(kafkaStreamsProperty.getBpInputTopic(), Consumed.with(Serdes.String(), joinedBASerde));

        StoreBuilder<KeyValueStore<String, JoinedBpAddrV2>> bpStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(BP_STORE_NAME),
                Serdes.String(),
                joinedBASerde
        );

        StoreBuilder<KeyValueStore<String, JoinedPersonAddrV2>> personStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(PERSON_STORE_NAME),
                Serdes.String(),
                joinedPASerde
        );

        StoreBuilder<KeyValueStore<String, Long>> debounceStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(DEBOUNCE_STORE),
                Serdes.String(),
                Serdes.Long()
        );

        builder.addStateStore(bpStoreBuilder);
        builder.addStateStore(personStoreBuilder);
        builder.addStateStore(debounceStoreBuilder);

        KStream<String, BpAddrOrPersonAddrV2> mergedStream = bpAddrKStream
                .mapValues(value -> new BpAddrOrPersonAddrV2(null, value))
                .merge(personAddrKStream.mapValues(value -> new BpAddrOrPersonAddrV2(value, null)));

        mergedStream.process(() -> new ContextualProcessor<String, BpAddrOrPersonAddrV2, String, ObjectUpdateEvent>() {
            @Override
            public void process(Record<String, BpAddrOrPersonAddrV2> record) {
                // assign value to a variable
                BpAddrOrPersonAddrV2 eitherValue = record.value();
                boolean isHashUpdated = false;

                // get state stores
                KeyValueStore<String, JoinedBpAddrV2> bpAddrStateStore = context().getStateStore(BP_STORE_NAME);
                KeyValueStore<String, JoinedPersonAddrV2> personAddrStateStore = context().getStateStore(PERSON_STORE_NAME);

                // assign single Values
                JoinedBpAddrV2 bpAddr = eitherValue.getBpAddr();
                JoinedPersonAddrV2 personAddr = eitherValue.getPersonAddr();

                // process if bp-addr record
                if (bpAddr != null) {
                    String[] splittedKey = record.key().split(":");
                    String personIdAddrIdKey = String.format("%s:%s", splittedKey[1], splittedKey[2]);

                    boolean isEmittent = isEmittent(bpAddr);

                    if (isRegisteredOwnerId(bpAddr, splittedKey[1]) || isEmittent) {
//                        personId, addrId, bpId
                        String revertedKey = String.format("%s:%s:%s", splittedKey[1], splittedKey[2], splittedKey[0]);

                        // get stored value from bp-addr state store
                        JoinedBpAddrV2 storedValue = bpAddrStateStore.get(revertedKey);
                        log.info("bpaddr entry storedValue: {}", storedValue);
                        if (storedValue == null) {
                            log.debug("{}, New BpAddr Table Entry for: {}, hash: {}", BP_STORE_NAME, record.key(), bpAddr.getHash());
                            bpAddrStateStore.put(revertedKey, bpAddr);
                            isHashUpdated = true;
                        } else if (!Objects.equals(storedValue.getHash(), bpAddr.getHash())) {
                            log.debug("{}, Update BpAddr Entry for key: {}, old hash: {}, new hash: {}", BP_STORE_NAME, record.key(), storedValue.getHash(), bpAddr.getHash());
                            bpAddrStateStore.put(revertedKey, bpAddr);
                            isHashUpdated = true;
                        } else {
                            log.debug("{}, No hash change do nothing for BpAddr key:{} hash value:{}", BP_STORE_NAME, record.key(), storedValue.getHash());
                        }
                    }

                    if (isHashUpdated) {
                        log.info("SEARCHING IN PERSON - ADDR STATE STORE WITH A KEY: {}", personIdAddrIdKey);

                        JoinedPersonAddrV2 matching = personAddrStateStore.get(personIdAddrIdKey);
                        log.info("MATCHING: {}", matching);

                        KeyValueIterator<String, JoinedPersonAddrV2> matchedPersonAddrIterator = personAddrStateStore.prefixScan(personIdAddrIdKey, new StringSerializer());

                        if (matchedPersonAddrIterator.hasNext()) {
                            while (matchedPersonAddrIterator.hasNext()) {
                                JoinedPersonAddrV2 matchedPersonAddr = matchedPersonAddrIterator.next().value;
                                // if the bp-addr is an emmitent and didn't find mathing personAddr look with just personId using prefix scan
                                // because addrId is not propagated on bp, therefore they might have a different domiAddr on bp and person
                                if (matchedPersonAddr == null && isEmittent) {
                                    String personId = splittedKey[1];
                                    // do a prefix scan for a personId
                                    KeyValueIterator<String, JoinedPersonAddrV2> joinedPersonIterator = personAddrStateStore.prefixScan(personId, new StringSerializer());
                                    while (joinedPersonIterator.hasNext()) {
                                        JoinedPersonAddrV2 item = joinedPersonIterator.next().value;
                                        context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(item, bpAddr)));
                                    }
                                    // "standard" match forward the record
                                } else if (matchedPersonAddr != null) {
                                    context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(matchedPersonAddr, bpAddr)));
                                }
                            }
                        }
                    }

                } else if (personAddr != null) { // process if person-addr record is coming

                    if (record.value() != null) {
                        JoinedPersonAddrV2 storedValue = personAddrStateStore.get(record.key());
                        log.info("personaddr entry storedValue: {}", storedValue);

                        if (storedValue == null) {
                            log.debug("{}, New PersonAddr Table entry for: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
                            personAddrStateStore.put(record.key(), personAddr);
                            isHashUpdated = true;
                        } else if (!storedValue.getHash().equals(personAddr.getHash())) {
                            log.debug("{}, Update PersonAddr Table entry for: {}, old hash: {}, new hash: {}", PERSON_STORE_NAME, record.key(), storedValue.getHash(), personAddr.getHash());
                            personAddrStateStore.put(record.key(), personAddr);
                            isHashUpdated = true;
                        } else {
                            log.debug("{}, No hash change do nothing for PersonAddr key: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
                        }
                    } else {
                        log.debug("Got NULL JoinedPersonAddr Object - skip! --> key: {}", record.key());
                    }

                    if (isHashUpdated) {
                        try {

                            // look for matches in bpAddr store
                            log.info("SEARCHING IN BP - ADDR STATE STORE WITH A KEY: {}", record.key());
                            KeyValueIterator<String, JoinedBpAddrV2> bpAddrIterator = bpAddrStateStore.prefixScan(record.key(), new StringSerializer());

                            if (bpAddrIterator.hasNext()) {
                                log.trace("Found records for key {} record in {}", record.key(), BP_STORE_NAME);
                                // got matches for given personId:addrId update all found records
                                while (bpAddrIterator.hasNext()) {
                                    JoinedBpAddrV2 bpAddrObject = bpAddrIterator.next().value;
                                    context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrObject)));
                                }
                            } else {
                                log.trace("Found NO records for key {} record in {}", record.key(), BP_STORE_NAME);
                                String[] splittedKey = record.key().split(":");
                                final String ZERO = "0";

                                // only when addr != 0 and is emittent
                                if (personAddr.getIsEmittent() && !splittedKey[1].equals(ZERO)) {
                                    String personIdNullAddrKey = String.format("%s:%s", splittedKey[0], ZERO);
//                                look for records in bp-addr state store with key personId:0
                                    KeyValueIterator<String, JoinedBpAddrV2> bpNullAddrIterator = bpAddrStateStore.prefixScan(personIdNullAddrKey, new StringSerializer());
                                    if (bpNullAddrIterator.hasNext()) {
                                        log.trace("Found records for key {} record in {}", personIdNullAddrKey, BP_STORE_NAME);
                                        while (bpNullAddrIterator.hasNext()) {
                                            JoinedBpAddrV2 bpAddrV2 = bpNullAddrIterator.next().value;
                                            context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrV2)));
                                        }
                                    }
                                } else {
                                    log.trace("Found NO matched sending PERSON_ONLY records");
                                    // no match in bpAddr state store send PERSON_ONLY event
                                    context().forward(record.withValue(createObjectUpdateEvent(personAddr, null)));
                                }
                            }
                        } catch (Exception e) {
                            log.error("EXCEPTION : {}", e.getMessage());
                        }
                    }
                }
            }
        }, BP_STORE_NAME, PERSON_STORE_NAME)
                .process(() -> new DebounceTransformer<>(DEBOUNCE_STORE, 5000), DEBOUNCE_STORE)
                .peek((key, value) -> log.debug("Produced Update Event key: {}, hash: {}", key, value.getHash()))
                .to(kafkaStreamsProperty.getOutputTopic(), Produced.with(Serdes.String(), updateSerde));

        return builder;
    }

Topology looks as follows:

Topology: Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [person.addr.join.topic])
      --> KSTREAM-MAPVALUES-0000000003
    Source: KSTREAM-SOURCE-0000000001 (topics: [bp.addr.join.topic])
      --> KSTREAM-MAPVALUES-0000000002
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-MERGE-0000000004
      <-- KSTREAM-SOURCE-0000000001
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-MERGE-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MERGE-0000000004 (stores: [])
      --> KSTREAM-PROCESSOR-0000000005
      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
    Processor: KSTREAM-PROCESSOR-0000000005 (stores: [bp-state-store, person-state-store])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-MERGE-0000000004
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [debounce-store])
      --> KSTREAM-PEEK-0000000007
      <-- KSTREAM-PROCESSOR-0000000005
    Processor: KSTREAM-PEEK-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-PROCESSOR-0000000006
    Sink: KSTREAM-SINK-0000000008 (topic: update.event.topic)
      <-- KSTREAM-PEEK-0000000007

Solution

  • The "problem" was the the strimzi cluster I used had a partitioned Topics. Situation: if you have a Topic A and B with 10 partitions with + produces that sends messages to theses topic with a default Partitioner it might be that a BpAddr record and PersonAddr record land in a different partition.

    It means that an entry in the state store(which is also partitioned) afterwards is also made in a different partitions --> Therefore when an update records comes over and app want to check entries in a state-store, we must make sure that the PersonAddr and BpAddr always land in the same partition.

    Solved it by writing a custom Partitioner for a producer that writes PersonAddr and BpAddr records to make sure they will be in the same partition of the topics.