I want to create an abstract class for thread-related class. Class that want to enable threading must inherit threading.Thread
class, however at the same time, class that want to enable the @abstractmethod
must inherit abc.ABC
class. Because multiple inheritance is a bad practice, can i achieve what i want to do without multiple inheritance ?
For context, i was following the example code from here https://vmois.dev/python-flask-background-thread/. Here are example of abstract class for thread-related class
import threading
from abc import abstractmethod, ABC
class BackgroundThread(threading.Thread, ABC):
def __init__(self):
super().__init__()
self._stop_event = threading.Event()
def stop(self) -> None:
self._stop_event.set()
def _stopped(self) -> bool:
return self._stop_event.is_set()
@abstractmethod
def startup(self) -> None:
"""
Method that is called before the thread starts.
Initialize all necessary resources here.
:return: None
"""
raise NotImplementedError()
@abstractmethod
def shutdown(self) -> None:
"""
Method that is called shortly after stop() method was called.
Use it to clean up all resources before thread stops.
:return: None
"""
raise NotImplementedError()
@abstractmethod
def handle(self) -> None:
"""
Method that should contain business logic of the thread.
Will be executed in the loop until stop() method is called.
Must not block for a long time.
:return: None
"""
raise NotImplementedError()
def run(self) -> None:
"""
This method will be executed in a separate thread
when start() method is called.
:return: None
"""
self.startup()
while not self._stopped():
self.handle()
self.shutdown()
It can be seen that the BackgroundThread inherit threading.Thread
to enable threading and inherit abc.ABC
to enable @abstractmethod. Can i refactor the code above so it does not use multiple inheritance ?
Would composition work? Here's the same code with the abstract portion split out from BackgroundThread
into an abstract ThreadHandler
class.
import logging
import queue
import threading
import time
from abc import ABC, abstractmethod
class ThreadHandler(ABC):
@abstractmethod
def startup(self) -> None:
"""
Method that is called before the thread starts.
Initialize all necessary resources here.
:return: None
"""
raise NotImplementedError()
@abstractmethod
def shutdown(self) -> None:
"""
Method that is called shortly after stop() method was called.
Use it to clean up all resources before thread stops.
:return: None
"""
raise NotImplementedError()
@abstractmethod
def handle(self) -> None:
"""
Method that should contain business logic of the thread.
Will be executed in the loop until stop() method is called.
Must not block for a long time.
:return: None
"""
raise NotImplementedError()
class BackgroundThread(threading.Thread):
def __init__(self, handler: ThreadHandler):
super().__init__()
self._handler = handler
self._stop_event = threading.Event()
def stop(self) -> None:
self._stop_event.set()
def _stopped(self) -> bool:
return self._stop_event.is_set()
def run(self) -> None:
"""
This method will be executed in a separate thread
when start() method is called.
:return: None
"""
self._handler.startup()
while not self._stopped():
self._handler.handle()
self._handler.shutdown()
You would then provide a concrete instance of a handler to the constructor of the BackgroundThread
. Here is an example inspired by the resource you linked to.
TASKS_QUEUE = queue.Queue()
class NotificationHandler(ThreadHandler):
def startup(self) -> None:
logging.info("NotificationThread started")
def shutdown(self) -> None:
logging.info("NotificationThread stopped")
def handle(self) -> None:
try:
task = TASKS_QUEUE.get(block=False)
# send_notification(task)
logging.info(f"Notification for {task} was sent.")
except queue.Empty:
time.sleep(1)
notification_thread = BackgroundThread(NotificationHandler())