Search code examples
pythonmultithreadingpython-multithreading

Python - threading assert group is None when creating a custom Thread Class


I wanted to create a custom Thread class that is able to propagate an exception it comes across to the main thread. My implementation is as follows:

class VerseThread(threading.Thread):

    def __init__(self, args):
        super().__init__(self, args=args)
        # self.scraper = scraper

    def run(self):
        self.exc = None
        try:
            book, abbrev, template, chapter = self.args
            self.parser.parse(book, abbrev, template, chapter)
        except ChapterNotFoundError as e:
            self.exc = e

    def join(self):
        threading.Thread.join(self)
        if self.exc:
            raise self.exc

This is supposed to run in the following method, inside a Scraper class (it's all inside a ẁhile true):

for book, abbrev, testament in self.books[init:end]:
    base_chapter = 1
    while True:
        threads = []
        if testament == 'ot':
            for i in range(3):
                threads.append(VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter+i)))
        else:
            for i in range(3):
                threads.append(VerseThread(args=(book, abbrev, NT_TEMPLATE, base_chapter+i)))
                            
        try:
            for thread in threads:
                if not thread.is_alive():
                    thread.start()
            for thread in threads:
                thread.join()
            base_chapter += 3
        except ChapterNotFoundError as e:
            LOGGER.info(f"{{PROCESS {multiprocessing.current_process().pid}}} - Chapter {e.chapter} not found in {book}, exiting book...")
            break

The issue is, if I run it like presented here, I get the error assert group is None, "group argument must be None for now". However, when I run it using Thread(target=self.parse, args=(book, abbrev, OT_TEMPLATE, base_chapter+1)) instead of VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter+i)), it works just fine, but the exception is of course still there. What's wrong with my code? How can I get rid of this error?

EDIT: Upon further testing, it seems that what I'm trying to do works fine when I use thread.run() instead of thread.start(), but then only one thread is being used, which is a problem. This, however, means that the error must be in the start() method, but I've no idea what to do.


Solution

  • You have several errors. First, if you are using super() as in super().__init__(self, target=target, args=args), you do not pass self explicitly as an argument. Second, to handle any possible thread-initializer arguments, your signature for this method should just be as follows:

    class VerseThread(threading.Thread):
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
    
        ... # rest of the code omitted
    

    But since your __init__ method does not do anything but call the parent's __init__ method with any passed arguments, there is now no need to even override this method.

    Finally, the attributes that you are interested in are not args but rather _args and _kwargs (in case keyword arguments are specified). Also, you have specified self.parser, but I do not see where that attribute has been set.

    import threading
    
    class ChapterNotFoundError(Exception):
        pass
    
    class VerseThread(threading.Thread):
    
        def run(self):
            self.exc = None
            try:
                book, abbrev, template, chapter = self._args
                self.parser.parse(book, abbrev, template, chapter)
            except ChapterNotFoundError as e:
                self.exc = e
    
        def join(self):
            threading.Thread.join(self)  # Or: super().join()
            if self.exc:
                raise self.exc
    
    for book, abbrev, testament in self.books[init:end]:
        base_chapter = 1
        while True:
            threads = []
            if testament == 'ot':
                for i in range(3):
                    threads.append(VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter+i)))
            else:
                for i in range(3):
                    threads.append(VerseThread(args=(book, abbrev, NT_TEMPLATE, base_chapter+i)))
                                
            try:
                for thread in threads:
                    if not thread.is_alive():
                        thread.start()
                for thread in threads:
                    thread.join()
                base_chapter += 3
            except ChapterNotFoundError as e:
                LOGGER.info(f"{{PROCESS {multiprocessing.current_process().pid}}} - Chapter {e.chapter} not found in {book}, exiting book...")
                break
    

    Improvement

    Accessing quasi-private attributes, such as self._args is a potentially dangerous thing and should be avoided.

    I can see the value of creating a subclass of Thread that will catch exceptions in the "worker" function it is to execute and then "propogate" it back to the main thread when it joins the thread. But I believe such a class should be general purpose and work with any type of worker function. In general, I don't like to have application-specific code (business logic) in a multithreading.Thread or multiprocessing.Pool subclass. I instead prefer having my business logic coded within a function or class method(s) that can then be used in multithreading, multiprocessing or serial processing as you see fit. The following is how I would code the Thread subclass (I have named it PropogateExceptionThread, but chose whatever name you wish) and I might use it:

    import threading
    
    class PropogateExceptionThread(threading.Thread):
        def run(self):
            self.exc = None
            try:
                super().run()
            except Exception as e:
                self.exc = e
    
        def join(self):
            super().join()
            if self.exc:
                raise self.exc
    
    def worker(x):
        if x < 10 or x > 20:
            raise ValueError(f'Bad value for argument x = {x}')
    
    t = PropogateExceptionThread(target=worker, args=(1,))
    t.start()
    try:
        t.join()
    except Exception as e:
        print('The thread raised an exception:', e)
    

    Prints:

    The thread raised an exception: Bad value for argument x = 1