I am setting some context variables using contextvars
module that can be accessed across the modules running on the same thread.
Initially I was creating contextvars.ContextVars()
object in each python file hoping that there is only single context shared amongts all the python files of the module running on same thread. But for each file it did create new context variables.
I took inspiration from flask library how it sets context of the web request in request object so that only thread on which web request came will be able to access it. Resources: (1) Request Contex working in flask (2) Flask Contexts advance
Basically, the Local class below is copy pasted from werkzeug library (werkzeug.local module : https://werkzeug.palletsprojects.com/en/2.3.x/local/#werkzeug.local.Local)
customContextObject.py
from contextvars import ContextVar
import typing as t
import warnings
class Local:
__slots__ = ("_storage",)
def __init__(self) -> None:
object.__setattr__(self, "_storage", ContextVar("local_storage"))
@property
def __storage__(self) -> t.Dict[str, t.Any]:
warnings.warn(
"'__storage__' is deprecated and will be removed in Werkzeug 2.1.",
DeprecationWarning,
stacklevel=2,
)
return self._storage.get({}) # type: ignore
def __iter__(self) -> t.Iterator[t.Tuple[int, t.Any]]:
return iter(self._storage.get({}).items())
def __getattr__(self, name: str) -> t.Any:
values = self._storage.get({})
try:
print(f"_storage : {self._storage} | values : {values}")
return values[name]
except KeyError:
raise AttributeError(name) from None
def __setattr__(self, name: str, value: t.Any) -> None:
values = self._storage.get({}).copy()
values[name] = value
self._storage.set(values)
def __delattr__(self, name: str) -> None:
values = self._storage.get({}).copy()
try:
del values[name]
self._storage.set(values)
except KeyError:
raise AttributeError(name) from None
localContextObject = Local()
The localContextObject
know can be imported in any python file in the project and they will have access to same ContextVar object.
Example: I am setting email property in localContextObject variable in contextVARSDifferentModulesCUSTOM.py file contextVARSexperiments module. We import and call check_true_false() function from utils.py
from contextVARSexperiments.utils import check_true_false, check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading
localContextObject.email = "[email protected]"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")
def callingUtils(a):
print(f"{threading.current_thread()} | {threading.main_thread()}")
check_true_false(a)
callingUtils('MAIN CALL')
Now the other file utils.py in the same module will have access to the same contextVars through localContextObject. It will print the same email as set in above file.
utils.py
import threading
import contextvars
from contextVARSexperiments.customContextObject import localContextObject
def decorator(func):
def wrapper(*args, **kwargs):
print("\n~~~ENTERING check_true_false~~~~~~ ")
func(*args, **kwargs)
print("~~~EXITED check_true_false~~~~~~\n")
return wrapper
@decorator
def check_true_false(a):
print(f"check_true_false2 {threading.current_thread()} | {threading.main_thread()}")
print(f" a : {a}")
print(f"localContextObject : {localContextObject}")
print(f"email : {localContextObject.email}")
Below is the output when we run contextVARSDifferentModulesCUSTOM.py
/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py
localContextObject : <_thread._local object at 0x7fcfb85fdd58> | email : [email protected]
<_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
~~~ENTERING check_true_false~~~~~~
check_true_false <_MainThread(MainThread, started 8671015616)> | <_MainThread(MainThread, started 8671015616)>
a : MAIN CALL
localContextObject : <_thread._local object at 0x7fcfb85fdd58>
email : [email protected]
~~~EXITED check_true_false~~~~~~
Now, I updated contextVARSDifferentModulesCUSTOM.py to call callingUtils() function on a new thread.
from contextVARSexperiments.utils import check_true_false
from contextVARSexperiments.customContextObject import localContextObject
import threading
localContextObject.email = "[email protected]"
print(f"localContextObject : {localContextObject} | email : {localContextObject.email}")
def callingUtils(a):
print(f"{threading.current_thread()} | {threading.main_thread()}")
check_true_false(a)
t1 = threading.Thread(target=callingUtils, args=('THREAD"S CALL',))
t1.start()
t1.join()
But this threw error because child thread didn't have access to parent thread's ContextVars. Output:
/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {'email': '[email protected]'}
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8> | email : [email protected]
<Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>
~~~ENTERING check_true_false~~~~~~
check_true_false <Thread(Thread-1, started 12937875456)> | <_MainThread(MainThread, started 8609043136)>
a : THREAD"S CALL
localContextObject : <contextVARSexperiments.customContextObject.Local object at 0x7ff1c02162e8>
_storage : <ContextVar name='local_storage' at 7ff1d0435668> | values : {}
Exception in thread Thread-1:
Traceback (most recent call last):
File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Users/<user>/miniconda3/envs/test_env/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/contextVARSDifferentModulesCUSTOM.py", line 13, in callingUtils
check_true_false(a)
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 26, in wrapper
func(*args, **kwargs)
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/utils.py", line 43, in check_true_false
print(f"email : {localContextObject.email}")
File "/Users/<user>/PycharmProjects/Temp/contextVARSexperiments/customContextObject.py", line 31, in __getattr__
raise AttributeError(name) from None
AttributeError: email
Now, I am trying to inherit Thread class and create my own custom implementation which will pass the context from parent thread to child thread.
I tried to replace threading.Thread
class with a CustomThread
class. Following are the implementations of CustomThread
class inside customThreading.py :
More about Context
object returned by copy_context()
method of contextvars library : https://docs.python.org/3/library/contextvars.html#contextvars.Context
Context
object returned by copy_context()
to run initialiser of Threading class: import threading
import contextvars
class CustomThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.current_context = contextvars.copy_context()
self.current_context.run(super().__init__, *args, **kwargs)
def start(self) -> None:
super().start()
Context
object returned by copy_context()
while calling start()
of Threading class: import threading
import contextvars
class CustomThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.current_context = contextvars.copy_context()
super().__init__(*args, **kwargs)
def start(self) -> None:
self.current_context.run(super().start)
contextmanager
decorator from contextlib on start()
of my class: import threading
import contextvars
from contextlib import contextmanager
class CustomThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.current_context = contextvars.copy_context()
super().__init__(*args, **kwargs)
@contextmanager
def start(self) -> None:
super().start()
But none of this worked.
Also, I am looking for custom implementation of ThreadPoolExecutor
from concurrent.futures
module.
Contextvars work similar as threading.local
variables, in that, in each thread, a context var is initially empty. It can take further independent values in the same thread by using the context.run
method from a contextvars.Context
object, and that is extensively used by the asyncio code, so that each call-stack in a asyncio task can have an independent context in a transparent way.
The code you picked from werkzeug automatically creates an empty dictionary when the context var used as storage is read - so you get the errors you listed, instead of a LookupError
.
Anyway, I digress - the only thing incorrect in your code is that start
is not the function to override in order to change the running context: it is called in the parent thread.
The run
method in the Thread class is the one that is executed in the child thread - if you just override that one so that it executes the code in the original run
method inside your passed context, you will get things working:
class CTXThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.ctx = contextvars.copy_context()
super().__init__(*args, **kwargs)
def run(self):
# This code runs in the target, child class:
self.ctx.run(super().run)
Also, as a side note, see that the contextlib
module, and the contextmanager
decorator, are not related to contextvars at all. Python re-uses the term "context" for more than one thing - in the case "contextlib" refers to context managers as used by the with
statement.