Search code examples
pythonrabbitmqamqppika

How to properly handle with AMQP connections to and api with multiple producers


I'm developing an api that communicates with other services from an event architecture using RabbitMQ Topics. Several routes from my API will publish events and I would like to have a single live connection at all times in my API. That way, at every new request I just create a new channel, and keep only one connection (I decided to do this after reading about how expensive an amqp 0-9-2 connection is).

For now I have something like this:

class Singleton:
    def __init__(self, target):
        self.target = target

    def __call__(self, *args, **kwargs) -> Any:
        try:
            return self._instance
        except AttributeError:
            self._instance = self.target(*args, **kwargs)
            return self._instance


@Singleton
class RabbitConnection(pika.BlockingConnection):
    def __init__(self):
        ssl_options = None

        if settings.RABBIT_SSL:
            context = ssl.create_default_context()
            ssl_options = pika.SSLOptions(context)

        credentials = pika.credentials.PlainCredentials(
            username=settings.RABBIT_USER,
            password=str(settings.RABBIT_PASSWORD),
        )
        parameters = pika.ConnectionParameters(
            host=settings.RABBIT_SERVER,
            port=settings.RABBIT_PORT,
            virtual_host="/",
            credentials=credentials,
            ssl_options=ssl_options,
            heartbeat=0
        )
        super().__init__(parameters=parameters)


class RabbitChannelProvider:
    _channel = None

    def __init__(self):
        self._connection = RabbitConnection()

    def __enter__(self) -> BlockingChannel:
        if not self._channel:
            self._channel = self._connection.channel()
            self._channel.exchange_declare(
                exchange=settings.RABBIT_EXCHANGE,
                exchange_type=ExchangeType.topic,
                passive=False,
                durable=True,
                auto_delete=False,
            )

        return self._channel

    def __exit__(self, exc_type, exc_value, tb) -> None:
        self._channel.close()
        self._channel = None


class MessagePublisher(SingletonCreateMixin, PublisherMessageBackend):
    id = "publisher_rabbitmq"

    def publish(self, routing_key: str, body: Any) -> None:
        try:
            message = build_message(body=body)
            logger.info(
                event="message_broker",
                event_type=LogEventType.SUCCESS,
                location=LogLocation.BACKEND,
                body=body,
                message="Sending message",
            )
            with RabbitChannelProvider() as channel:
                channel.basic_publish(
                    exchange=settings.RABBIT_EXCHANGE,
                    routing_key=routing_key,
                    body=message,
                    properties=pika.BasicProperties(
                        content_type="application/json"
                    ),
                )
        except Exception as err:
            logger.error(
                event="message_broker",
                event_type=LogEventType.ERROR,
                location=LogLocation.BACKEND,
                body=body,
                error=err,
            )
            raise MessageBrokerException(message=err)

This is the right way to maintain only one connection in the api process ? Am I doing this right ?


Solution

  • form the official pika documentation

    Is Pika thread safe?

    Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.

    so your solution can work with a single thread