Search code examples
pythonmultiprocessingpython-unittestpathos

How to execute python subTests in parallel?


Consider the following unittest.TestCase, which implements two versions of the same test, the only difference being that one executes the subTests in parallel using multiprocessing.

import multiprocessing as mp
from unittest import TestCase


class TestBehaviour(TestCase):
    def _test_equals(self, val):
        for target_val in [1, 2]:
            with self.subTest(target=target_val, source=val):
                self.assertEqual(val, target_val)

    def test_equality_parallel(self):
        with mp.Pool(processes=4) as pool:
            pool.map(self._test_equals, [1, 2])
            pool.join()
            pool.close()

    def test_equality(self):
        for val in [1, 2]:
            self._test_equals(val)

The serial version, test_equality, works as expected and produces the following test failures:

======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=2, source=1)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "temp.py", line 11, in _test_equals
    self.assertEqual(val, target_val)
AssertionError: 1 != 2

======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=1, source=2)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "temp.py", line 11, in _test_equals
    self.assertEqual(val, target_val)
AssertionError: 2 != 1

On the other hand, test_equality_parallel causes an error as the TestCase cannot be pickled:

Traceback (most recent call last):
  File "temp.py", line 15, in test_equality_parallel
    pool.map(self._test_equals, [1, 2])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 768, in get
    raise self._value
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object

Now I know that I could split out _test_equals as a standalone function outside the class; however, I would like to keep the behaviour of subTest to enable better logging (and subsequent debugging) of test failures.

How can I run the tests in parallel, but keep this subTest functionality?

Update

I have also attempted this using pathos.multiprocessing.ProcessingPool to circumvent the issues with TestCase serialization; however, in this case pool.join() raises ValueError: Pool is still running.

from pathos.multiprocessing import ProcessingPool
...
    def test_equality_parallel(self):                                           
        pool = ProcessingPool(processes=4)                                      
        pool.map(self._test_equals, [1, 2])
        pool.join()

Update 2

This question is definitely relevant. The first solution proposed, creating a second class for the methods to be called from the child process, isn't appropriate as it wouldn't enable the use of subTest. The second, removing the unpickleable _Outcome object from TestCase seems hacky and, given that the child processes are running subTests, also looks unsuitable.


Solution

  • I'm the pathos (and dill and multiprocess) author. You are still seeing a serialization error out of pathos across processes. You could, try serializing across threads. Thread parallel is probably appropriate for this level of function.

    import multiprocess.dummy as mp
    from unittest import TestCase
    
    
    class TestBehaviour(TestCase):
        def _test_equals(self, val):
            for target_val in [1, 2]:
                with self.subTest(target=target_val, source=val):
                    self.assertEqual(val, target_val)
    
        def test_equality_parallel(self):
            with mp.Pool(processes=4) as pool:
                pool.map(self._test_equals, [1, 2])
                pool.join()
                pool.close()
    
        def test_equality(self):
            for val in [1, 2]:
                self._test_equals(val)
    

    Which yields:

    ======================================================================
    FAIL: test_equality (test_equaltiy.TestBehaviour)
    ----------------------------------------------------------------------
    ...[snip]...
    AssertionError: 1 != 2
    
    ======================================================================
    FAIL: test_equality_parallel (test_equaltiy.TestBehaviour)
    ----------------------------------------------------------------------
    ...[snip]...
    AssertionError: 1 != 2
    
    ----------------------------------------------------------------------
    Ran 2 tests in 0.108s
    
    FAILED (failures=2)
    

    This tells me that you might be able to use a serialization variant from dill (i.e. within dill.settings) to get around the serialization issue. See: https://github.com/uqfoundation/multiprocess/issues/48.