The background of my problem is that I use ´Path.exists()´ on windows to check if a network path is available. If it is not, ´Path.exists()´ can block for quite some time (20 seconds or more) before it returns False
.
This is too long so I want to define a time limit, after which I consider the ressource to be unavailable.
What I came up with is a wrapper class around ´Path´, which uses a ThreadPoolExecutor
to call Path
s exists
method and cancels the call after a timeout:
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
start_time = time.time()
while (time.time() - start_time) < self.timeout:
if future.done():
return future.result()
future.cancel()
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
This works, but even on a local folder this sometimes returns False
for an existing folder on a local drive even if the timeout is set to one second, which I assume is due to the overhead that is needed to start the thread.
So I guess my question is: is it possible to reduce the overhead introduced by the thread pool by using another threading/multiprocessing implementation? Or is there another possible solution without threading I'm not seeing?
Thanks to @IInspectable I think I figured out what my mistake was.
Polling the future.done
method in the while loop creates way too many unnecessary cycles.
Future.result()
actually already has a timeout parameter and blocks until the future has a result or times out:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pathlib import Path
from typing import Any
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
try:
return future.result(self.timeout)
except TimeoutError:
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
On my machine this works quite reliably for timeouts of 2 ms.