I have a class that imports the following module:
import pika
import pickle
from apscheduler.schedulers.background import BackgroundScheduler
import time
import logging
class RabbitMQ():
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
self.channel = self.connection.channel()
self.sched = BackgroundScheduler()
self.sched.add_job(self.keep_connection_alive, id='clean_old_data', trigger='cron', hour = '*', minute='*', second='*/50')
self.sched.start()
def publish_message(self, message , path="path"):
message["path"] = path
logging.info(message)
message = pickle.dumps(message)
self.channel.basic_publish(exchange="", routing_key="server", body=message)
def keep_connection_alive(self):
self.connection.process_data_events()
rabbitMQ = RabbitMQ()
def publish_message(message , path="path"):
rabbitMQ.publish_message(message, path=path)
My class.py:
import RabbitMQ as rq
class MyClass():
...
When generating unit tests for MyClass I can't mock the connection for this part of the code. And keeping throwing exceptions. And it will not work at all
pika.exceptions.ConnectionClosed: Connection to 127.0.0.1:5672 failed: [Errno 111] Connection refused
I tried a couple of approaches to mock this connection but none of those seem to work. I was wondering what can I do to support this sort of test? Mock the entire RabbitMQ module? Or maybe mock only the connection
Like the commenter above mentions, the issue is your global creation of your RabbitMQ
.
My knee-jerk reaction is to say "just get rid of that, and your module-level publish_message
". If you can do that, go for that solution. You have a publish_message
on your RabbitMQ
class that accepts the same args; any caller would then be expected to create an instance of your RabbitMQ
class.
If you don't want to or can't do that for whatever reason, you should just move the instantiation of move that object instantiation in your module-level publish_message
like this:
def publish_message(message , path="path"):
rabbitMQ = RabbitMQ()
rabbitMQ.publish_message(message, path=path)
This will create a new connection every time you call it though. Maybe that's ok...but maybe it's not. So to avoid creating duplicate connections, you'd want to introduce something like a singleton pattern:
class RabbitMQ():
__instance = None
...
@classmethod
def get_instance(cls):
if cls.__instance is None:
cls.__instance = RabbitMQ()
return cls.__instance
def publish_message(message , path="path"):
RabbitMQ.get_instance().publish_message(message, path=path)
Ideally though, you'd want to avoid the singleton pattern entirely. Whatever caller should store a single instance of your RabbitMQ
object and call publish_message
on it directly.
So the TLDR/ideal solution IMO: Just get rid of those last 3 lines. The caller should create a RabbitMQ
object.
EDIT: Oh, and the why it's happening -- When you import that module, this is being evaluated: rabbitMQ = RabbitMQ()
. Your attempt to mock it is happening after that is evaluated, and fails to connect.