Say I have an unbounded set of keys for messages published to a pulsar topic.
{k0, k1, ..., kn}
And a finite set of expected message categories, where the category information is part of the message payload.
{c0, c1, c2}
Whenever all message categories for a given key are consumed I want to invoke an action in my application. For example, if I see the following key/category pairs, I would expect to see an action invoked.
{(k0, c0), (k0, c1), (k0, c2)} => action invoked for key k0
{(k1, c0), (k1, c1), (k1, c2)} => action invoked for key k1
In order to ensure application resiliency I only ack messages once all categories have been consumed. If a message pertaining to the same category is consumed twice I can ack the older message, holding on to one message per category.
Now, let's say I have a single consumer attached to the subscription and configured with the key_shared subscription type. We consume the following key/category pairs.
{(k0, c0), (k0, c1)}
And while waiting for (k0, c2)
a second consumer is added to the subscription. According to this issue, the new consumer will not receive messages until the existing consumer acks or nacks the pending messages. This seems to be expected behaviour, and is indeed the behaviour I am seeing.
I am wondering if there is there a more idiomatic way I can go about implementing this feature? Does it make sense to delay acking of messages in order to achieve this grouping behaviour?
Using a partitioned topic with the failover subscription type achieves our design goal. Below are a description of the approaches we explored and the observed behaviour.
Non-partitioned topic with key_shared subscription
When the application is scaled out (more consumers added to the subscription), any pending messages (messages delivered to a consumer, but not yet acked) cause the new consumer to not receive any messages until the pending messages are acked/nacked or the pre-existing consumer unsubscribes.
Partitioned topic with failover subscription
When the application is scaled out, topic/partition pairs are re-assigned evenly across consumers and pending messages (if any) are re-delivered. Consumers needs to be informed when ownership of a topic partition changes in order to clear the internal state, for this the consumer event listener can be used.