Search code examples
pythonmultithreadingflaskthreadpoolpython-contextvars

Python: Pass ContexVars from parent thread to child thread spawn using threading.Thread()


I am setting some context variables using contextvars module that can be accessed across the modules running on the same thread. Initially I was creating contextvars.ContextVars() object in each python file hoping that there is only single context shared amongts all the python files of the module running on same thread. But for each file it did create new context variables.

I took inspiration from flask library how it sets context of the web request in request object so that only thread on which web request came will be able to access it. Resources: (1) Request Contex working in flask (2) Flask Contexts advance

Basically, the Local class below is copy pasted from werkzeug library (werkzeug.local module : https://werkzeug.palletsprojects.com/en/2.3.x/local/#werkzeug.local.Local)

customContextObject.py

from contextvars import ContextVar
import typing as t
import warnings

class Local:
    __slots__ = ("_storage",)

    def __init__(self) -> None:
        object.__setattr__(self, "_storage", ContextVar("local_storage"))

    @property
    def __storage__(self) -> t.Dict[str, t.Any]:
        warnings.warn(
            "'__storage__' is deprecated and will be removed in Werkzeug 2.1.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self._storage.get({})  # type: ignore

    def __iter__(self) -> t.Iterator[t.Tuple[int, t.Any]]:
        return iter(self._storage.get({}).items())

    def __getattr__(self, name: str) -> t.Any:
        values = self._storage.get({})
        try:
            print(f"_storage : {self._storage} | values : {values}")
            return values[name]
        except KeyError:
            raise AttributeError(name) from None

    def __setattr__(self, name: str, value: t.Any) -> None:
        values = self._storage.get({}).copy()
        values[name] = value
        self._storage.set(values)

    def __delattr__(self, name: str) -> None:
        values = self._storage.get({}).copy()
        try:
            del values[name]
            self._storage.set(values)
        except KeyError:
            raise AttributeError(name) from None

localContextObject = Local()

The localContextObject know can be imported in any python file in the project and they will have access to same ContextVar object.

Example: I am setting email property in localContextObject variable in contextVARSDifferentModulesCUSTOM.py file contextVARSexperiments module. We import and call check_true_false() function from utils.py

from contextVARSexperiments.utils import check_true_false, check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading

localContextObject.email = "[email protected]"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")


def callingUtils(a):
    print(f"{threading.current_thread()} | {threading.main_thread()}")
    check_true_false(a)


callingUtils('MAIN CALL')

Now the other file utils.py in the same module will have access to the same contextVars through localContextObject. It will print the same email as set in above file.

utils.py

import threading
import contextvars
from contextVARSexperiments.customContextObject import localContextObject


def decorator(func):
    def wrapper(*args, **kwargs):
        print("\n~~~ENTERING check_true_false~~~~~~ ")
        func(*args, **kwargs)
        print("~~~EXITED check_true_false~~~~~~\n")
    return wrapper


@decorator
def check_true_false(a):
    print(f"check_true_false2 {threading.current_thread()} | {threading.main_thread()}")
    print(f" a : {a}")
    print(f"localContextObject : {localContextObject}")
    print(f"email : {localContextObject.email}")

Below is the output when we run contextVARSDifferentModulesCUSTOM.py

/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py 
localContextObject : <_thread._local object at 0x7fcfb85fdd58> | email : [email protected]
<_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
~~~ENTERING check_true_false~~~~~~ 
check_true_false <_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
 a : MAIN CALL
localContextObject : <_thread._local object at 0x7fcfb85fdd58>
email : [email protected]
~~~EXITED check_true_false~~~~~~

Now, I updated contextVARSDifferentModulesCUSTOM.py to call callingUtils() function on a new thread.

from contextVARSexperiments.utils import check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading

localContextObject.email = "[email protected]"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")


def callingUtils(a):
    print(f"{threading.current_thread()} | {threading.main_thread()}")
    check_true_false(a)


t1 = threading.Thread(target=callingUtils, args=('THREAD"S CALL',))
t1.start()
t1.join()

But this threw error because child thread didn't have access to parent thread's ContextVars. Output:

/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py 
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {'email': '[email protected]'}
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8> | email : [email protected]
<Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>

~~~ENTERING check_true_false~~~~~~ 
check_true_false <Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>
 a : THREAD"S CALL
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8>
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {}
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py", line 13, in callingUtils
    check_true_false(a)
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 26, in wrapper
    func(*args, **kwargs)
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 43, in check_true_false
    print(f"email : {localContextObject.email}")
  File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/customContextObject.py", line 31, in __getattr__
    raise AttributeError(name) from None
AttributeError: email

Now, I am trying to inherit Thread class and create my own custom implementation which will pass the context from parent thread to child thread.

I tried to replace threading.Thread class with a CustomThread class. Following are the implementations of CustomThread class inside customThreading.py :

More about Context object returned by copy_context() method of contextvars library : https://docs.python.org/3/library/contextvars.html#contextvars.Context

  1. Using Context object returned by copy_context() to run initialiser of Threading class:
    import threading
    import contextvars
    
    class CustomThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            self.current_context = contextvars.copy_context()
            self.current_context.run(super().__init__, *args, **kwargs)
    
        def start(self) -> None:
            super().start()
  1. Using Context object returned by copy_context() while calling start() of Threading class:
    import threading
    import contextvars
    
    class CustomThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            self.current_context = contextvars.copy_context()
            super().__init__(*args, **kwargs)
    
        def start(self) -> None:
            self.current_context.run(super().start)
  1. Using contextmanager decorator from contextlib on start() of my class:
    import threading
    import contextvars
    from contextlib import contextmanager
    
    class CustomThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            self.current_context = contextvars.copy_context()
            super().__init__(*args, **kwargs)
    
        @contextmanager
        def start(self) -> None:
            super().start()

But none of this worked.

Also, I am looking for custom implementation of ThreadPoolExecutor from concurrent.futures module.


Solution

  • Contextvars work similar as threading.local variables, in that, in each thread, a context var is initially empty. It can take further independent values in the same thread by using the context.run method from a contextvars.Context object, and that is extensively used by the asyncio code, so that each call-stack in a asyncio task can have an independent context in a transparent way.

    The code you picked from werkzeug automatically creates an empty dictionary when the context var used as storage is read - so you get the errors you listed, instead of a LookupError.

    Anyway, I digress - the only thing incorrect in your code is that start is not the function to override in order to change the running context: it is called in the parent thread.

    The run method in the Thread class is the one that is executed in the child thread - if you just override that one so that it executes the code in the original run method inside your passed context, you will get things working:

    class CTXThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            self.ctx = contextvars.copy_context()
            super().__init__(*args, **kwargs)
        def run(self):
            # This code runs in the target, child class:
            self.ctx.run(super().run)  
    
    
    

    Also, as a side note, see that the contextlib module, and the contextmanager decorator, are not related to contextvars at all. Python re-uses the term "context" for more than one thing - in the case "contextlib" refers to context managers as used by the with statement.