I'm developing an api that communicates with other services from an event architecture using RabbitMQ Topics. Several routes from my API will publish events and I would like to have a single live connection at all times in my API. That way, at every new request I just create a new channel, and keep only one connection (I decided to do this after reading about how expensive an amqp 0-9-2 connection is).
For now I have something like this:
class Singleton:
def __init__(self, target):
self.target = target
def __call__(self, *args, **kwargs) -> Any:
try:
return self._instance
except AttributeError:
self._instance = self.target(*args, **kwargs)
return self._instance
@Singleton
class RabbitConnection(pika.BlockingConnection):
def __init__(self):
ssl_options = None
if settings.RABBIT_SSL:
context = ssl.create_default_context()
ssl_options = pika.SSLOptions(context)
credentials = pika.credentials.PlainCredentials(
username=settings.RABBIT_USER,
password=str(settings.RABBIT_PASSWORD),
)
parameters = pika.ConnectionParameters(
host=settings.RABBIT_SERVER,
port=settings.RABBIT_PORT,
virtual_host="/",
credentials=credentials,
ssl_options=ssl_options,
heartbeat=0
)
super().__init__(parameters=parameters)
class RabbitChannelProvider:
_channel = None
def __init__(self):
self._connection = RabbitConnection()
def __enter__(self) -> BlockingChannel:
if not self._channel:
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=settings.RABBIT_EXCHANGE,
exchange_type=ExchangeType.topic,
passive=False,
durable=True,
auto_delete=False,
)
return self._channel
def __exit__(self, exc_type, exc_value, tb) -> None:
self._channel.close()
self._channel = None
class MessagePublisher(SingletonCreateMixin, PublisherMessageBackend):
id = "publisher_rabbitmq"
def publish(self, routing_key: str, body: Any) -> None:
try:
message = build_message(body=body)
logger.info(
event="message_broker",
event_type=LogEventType.SUCCESS,
location=LogLocation.BACKEND,
body=body,
message="Sending message",
)
with RabbitChannelProvider() as channel:
channel.basic_publish(
exchange=settings.RABBIT_EXCHANGE,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(
content_type="application/json"
),
)
except Exception as err:
logger.error(
event="message_broker",
event_type=LogEventType.ERROR,
location=LogLocation.BACKEND,
body=body,
error=err,
)
raise MessageBrokerException(message=err)
This is the right way to maintain only one connection in the api process ? Am I doing this right ?
form the official pika documentation
Is Pika thread safe?
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.
so your solution can work with a single thread