Search code examples
rabbitmqlocustkombu

I can’t attach a Rabbit MQ client to Locust user


I am trying to run a load test that injects messages to rabbit mq, but I can get locust user class to get the custom client I created loaded. Has someone managed to do this?

import os
from locust import HttpUser, task, TaskSet, run_single_user
from kombu import Connection, Exchange, Queue


class KombuClient:
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.connection = Connection(
            hostname=os.getenv(
                "LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
            ),
        )
        self.channel = self.connection.channel()

    def send_message(self, message):
        exchange = Exchange(
            name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
            type="direct",
        )
        queue = Queue(
            name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        queue.maybe_bind(self.connection)
        queue.declare()

        producer = self.connection.Producer(
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        producer.publish(message)
        print("INFO: Sent message: '{}'".format(message))

    def close_connection(self):
        self.connection.release()

class UserBehavior(TaskSet):
    @task
    def send_hello_message(self):
        I make it work by, instead of the line below but that is not 100% correct--> KombuClient().send_message("Hello World!")
        self.client.send_message("Hello World!")


class RabbitMQUser(HttpUser):
    host = "http://localhost"
    tasks = [UserBehavior]
    min_wait = 5000
    max_wait = 9000
    client = KombuClient
  

if __name__ == "__main__":
    run_single_user(RabbitMQUser)

In general I can inject the messages but Locust does not recognize the transaction and does not shows any user running.

I have tried to inspect the Locust code to see what the client expected class is, but I haven’t found any useful information.

I guess the API changed as I am using Locust 2.15.1 and there is no HttpClient class no more


Solution

  • Here is the updated code, I am still missing adding the changes @Solowalker suggested, but if you put all together you can have the whole answer. I will try to update the code once all is working.

    import os
    from locust import task, TaskSet, User, run_single_user
    from locust.clients import HttpSession
    from kombu import Connection, Exchange, Queue
    
    
    class KombuClient(HttpSession):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.connection = Connection(
                hostname=os.getenv(
                    "LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
                ),
            )
            self.channel = self.connection.channel()
    
        def send_message(self, message):
            exchange = Exchange(
                name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
                type="direct",
            )
            queue = Queue(
                name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
                exchange=exchange,
                routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
            )
            queue.maybe_bind(self.connection)
            queue.declare()
    
            producer = self.connection.Producer(
                exchange=exchange,
                routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
            )
            producer.publish(message)
            print("INFO: Sent message: '{}'".format(message))
    
        def close_connection(self):
            self.connection.release()
    
    
    class UserBehavior(TaskSet):
        @task
        def send_hello_message(self):
            self.client.send_message(message="Hello World!")
    
    
    class RabbitMQUser(User):
        host = "http://localhost"
        tasks = [UserBehavior]
        min_wait = 5000
        max_wait = 9000
        abstract = True
    
        def __init__(self, environment):
            super().__init__(environment)
            self.client = KombuClient(self.host, user=self, request_event=environment.events.request)
    
    
    if __name__ == "__main__":
        run_single_user(RabbitMQUser)