Is there any method to process messages with high priority first?
I tried creating three topics 'high', 'medium', and 'low', and subscribed to all three topics with one consumer and if there is an unprocessed message in the 'high' topic it will pause the other two. Is there any better way for implementing message priority?
I tried using the logic given below.
topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)
while True:
messages = consumer.poll()
high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)
if high_priotity_unprocessed_msg >0:
consumer.pause(medium_topic_partition)
consumer.pause(low_topic_partition)
else:
consumer.resume(medium_topic_partition)
if medium_priotity_unprocessed_msg >0:
consumer.pause(low_topic_partition)
else:
consumer.resume(low_topic_partition)
if messages:
process(messages)
One option that you may evaluate is basically just having more parallelism on higher priority messages...
For example:
Topic1 (Priority Low): 1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High): 20 partitions
And then basically have:
👆Now, I would suggest you the easiest way to do this is basically write the code once... but externalize the configuration of the "topic name"... and then just scale it up (of course using containers)... Please refer to this reading:
For example, the code could be as simple as:
SuperAwesomeAppBinaryCode:
topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)
while True:
messages = consumer.poll()
if messages:
process(messages)
Now, if we have that code deployed on, let's say K8s, you could have 3 different deployments, running the same code, but injecting the right topic for each case, for example:
apiVersion: apps/v1
kind: Deployment
metadata:
name: LowPriorityProcessor
labels:
app: LowPriorityProcessor
spec:
replicas: 1
selector:
matchLabels:
app: LowPriorityProcessor
template:
metadata:
labels:
app: LowPriorityProcessor
spec:
containers:
- name: LowPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic1
ports:
- containerPort: 80
apiVersion: apps/v1
kind: Deployment
metadata:
name: MediumPriorityProcessor
labels:
app: MediumPriorityProcessor
spec:
replicas: 5
selector:
matchLabels:
app: MediumPriorityProcessor
template:
metadata:
labels:
app: MediumPriorityProcessor
spec:
containers:
- name: MediumPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic2
ports:
- containerPort: 80
apiVersion: apps/v1
kind: Deployment
metadata:
name: HighPriorityProcessor
labels:
app: HighPriorityProcessor
spec:
replicas: 20
selector:
matchLabels:
app: HighPriorityProcessor
template:
metadata:
labels:
app: HighPriorityProcessor
spec:
containers:
- name: HighPriorityProcessor
image: SuperAwesomeAppBinaryCode:1.0.0
env:
- name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
value: topic3
ports:
- containerPort: 80
And then just let the parallelism do its magic 😉 If you check carefully the only thing that changes from one "k8s deployment" to another is the topic and the number of replicas.
Notes: