Search code examples
python-3.xwindowspython-multithreadingpathlib

How to efficiently implement a version of Path.exists() with a timeout on Windows


The background of my problem is that I use ´Path.exists()´ on windows to check if a network path is available. If it is not, ´Path.exists()´ can block for quite some time (20 seconds or more) before it returns False. This is too long so I want to define a time limit, after which I consider the ressource to be unavailable. What I came up with is a wrapper class around ´Path´, which uses a ThreadPoolExecutor to call Paths exists method and cancels the call after a timeout:

from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

class TimeoutPath:
    executor = ThreadPoolExecutor(max_workers=1)

    def __init__(self, *args, timeout: float = 1, **kwargs):
        self._path = Path(*args, **kwargs)
        self.timeout = timeout

    def exists(self) -> bool:
        future = TimeoutPath.executor.submit(self._path.exists)
        start_time = time.time()
        while (time.time() - start_time) < self.timeout:
            if future.done():
                return future.result()
        future.cancel()
        return False

    def get_path(self) -> Path:
        return self._path

    def __getattr__(self, name: str) -> Any:
        return getattr(self._path, name)

    def __str__(self) -> str:
        return str(self._path)

This works, but even on a local folder this sometimes returns False for an existing folder on a local drive even if the timeout is set to one second, which I assume is due to the overhead that is needed to start the thread.

So I guess my question is: is it possible to reduce the overhead introduced by the thread pool by using another threading/multiprocessing implementation? Or is there another possible solution without threading I'm not seeing?


Solution

  • Thanks to @IInspectable I think I figured out what my mistake was.

    Polling the future.done method in the while loop creates way too many unnecessary cycles. Future.result() actually already has a timeout parameter and blocks until the future has a result or times out:

    from concurrent.futures import ThreadPoolExecutor, TimeoutError
    from pathlib import Path
    from typing import Any
    
    class TimeoutPath:
        executor = ThreadPoolExecutor(max_workers=1)
    
        def __init__(self, *args, timeout: float = 1, **kwargs):
            self._path = Path(*args, **kwargs)
            self.timeout = timeout
    
        def exists(self) -> bool:
            future = TimeoutPath.executor.submit(self._path.exists)
            try:
                return future.result(self.timeout)
            except TimeoutError:
                return False
    
        def get_path(self) -> Path:
            return self._path
    
        def __getattr__(self, name: str) -> Any:
            return getattr(self._path, name)
    
        def __str__(self) -> str:
            return str(self._path)
    

    On my machine this works quite reliably for timeouts of 2 ms.