I'm having a special case where I would need to make sure that all messages for a parent
key go to a specific partition. Assuming following attributes for the key
pk
: UUID
and for the value
parent
: String
name
: String
familyName
: String
Previously I've implemented other partitioners but I always was able to take the relevant information from the key part. The interface org.apache.kafka.clients.producerPartitioner shows for the method partition both possibilities key and value. However it also denotes that all of them could be null.
I'm asking this because the content of the stream later also gets loaded into a materialized view. This view should allow to search for the "pk".
If I would include the parent
attribute in the key as well, I could no longer search for a pk
unless I'd have parent
as well.
Here is how I setup a materialized view inside of a stream processing topology:
KTable<MyObjectKey, MyObjectValue> myObjects = builder
.table("topicName", Consumed.with(myObjectKeySerde, myObjectValueSerde),
Materialized.<MyObjectKey, MyObjectValue, KeyValueStore<Bytes, byte[]>>as("viewName")
.withKeySerde(myObjectKeySerde)
.withValueSerde(myObjectValueSerde));
Later I then can access it like
final MyObjectKey pk = MyObjectKey.newBuilder().setUUID(... UUID searched for).build();
final ReadOnlyKeyValueStore<MyObjectKey, MyObjectValue> store =
streamsBuilderFactoryBean.getKafkaStreams()
.store( fromNameAndType("viewName", keyValueStore()));
MyObjectValue value = store.get(pk);
In theory could use a range query which then would allow me to include parent
in the key. Such I then would search from pk
+ 00000000-0000-0000-0000-000000000000
to pk
+ ffffffff-ffff-ffff-ffff-ffffffffffff
.
make sure that all messages for a parent key go to a specific partition
A Partitioner
would be the correct way to do that.
method partition both possibilities key and value. However it also denotes that all of them could be null
If you have full control over the producers using the Partitioner, that shouldn't be an issue. I don't think both key and value can be null, at least not with a state-store where non-null keys are required.
If you build the keys such as simple strings parent:pk
, then use ReadOnlyKeyValueStore#prefixScan
, that should allow you to get all data for a particular parent prefix, after which, you'd need to check the values of the returned iterator for any more filtering / lookups.