Search code examples
pythonapache-kafkaapache-kafka-streamskafka-pythonconfluent-kafka-python

How to read and process high priority messages in kafka consumer?


Is there any method to process messages with high priority first?

I tried creating three topics 'high', 'medium', and 'low', and subscribed to all three topics with one consumer and if there is an unprocessed message in the 'high' topic it will pause the other two. Is there any better way for implementing message priority?

I tried using the logic given below.

topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)

while True:

    messages = consumer.poll() 
    high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
    medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
    low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)

    if high_priotity_unprocessed_msg >0:  
     consumer.pause(medium_topic_partition)
            consumer.pause(low_topic_partition)

        else:
            consumer.resume(medium_topic_partition)

            if medium_priotity_unprocessed_msg >0:
                consumer.pause(low_topic_partition)
            else:
                consumer.resume(low_topic_partition)
        if messages:
            process(messages)

Solution

  • One option that you may evaluate is basically just having more parallelism on higher priority messages...

    For example:

    Topic1 (Priority Low):    1 partitions
    Topic2 (Priority medium): 5 partitions
    Topic3 (Priority High):  20 partitions
    

    And then basically have:

    • 1 consumers to get the data from topic1
    • 5 consumers from topic2
    • 20 consumers from topic3

    👆Now, I would suggest you the easiest way to do this is basically write the code once... but externalize the configuration of the "topic name"... and then just scale it up (of course using containers)... Please refer to this reading:

    For example, the code could be as simple as:

    SuperAwesomeAppBinaryCode:

    topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
    consumer.subscribe(topic)
    
    while True:
    
        messages = consumer.poll() 
        if messages:
            process(messages)
    

    Now, if we have that code deployed on, let's say K8s, you could have 3 different deployments, running the same code, but injecting the right topic for each case, for example:

    Low Priority Messages

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: LowPriorityProcessor
      labels:
        app: LowPriorityProcessor
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: LowPriorityProcessor
      template:
        metadata:
          labels:
            app: LowPriorityProcessor
        spec:
          containers:
          - name: LowPriorityProcessor
            image: SuperAwesomeAppBinaryCode:1.0.0
            env:
            - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
              value: topic1
            ports:
            - containerPort: 80
    

    Medium Priority Messages

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: MediumPriorityProcessor
      labels:
        app: MediumPriorityProcessor
    spec:
      replicas: 5
      selector:
        matchLabels:
          app: MediumPriorityProcessor
      template:
        metadata:
          labels:
            app: MediumPriorityProcessor
        spec:
          containers:
          - name: MediumPriorityProcessor
            image: SuperAwesomeAppBinaryCode:1.0.0
            env:
            - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
              value: topic2
            ports:
            - containerPort: 80
    

    High Priority Messages

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: HighPriorityProcessor
      labels:
        app: HighPriorityProcessor
    spec:
      replicas: 20
      selector:
        matchLabels:
          app: HighPriorityProcessor
      template:
        metadata:
          labels:
            app: HighPriorityProcessor
        spec:
          containers:
          - name: HighPriorityProcessor
            image: SuperAwesomeAppBinaryCode:1.0.0
            env:
            - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
              value: topic3
            ports:
            - containerPort: 80
    

    And then just let the parallelism do its magic 😉 If you check carefully the only thing that changes from one "k8s deployment" to another is the topic and the number of replicas.

    Notes:

    • You can achieve this without K8s.... using Docker Swarm or even just docker-compose or running manually the instances 🤷‍♂️, but why would you like to reinvent the wheel, but for sure in some edge cases, there is no much option...
    • A nice reading about this topic can be found here