Search code examples
pythoncachingredisfastapiuvicorn

Python functools LRU cache shared between multiple processes


Let's say I have 4 load-balanced Python API processes that calculate the factorial of a number.

Let's say factorial returns a Pydantic object or deeply nested dict. I do not want to use Redis for caching because nested dict/list serialization is expensive. So I use the LRU function cache.

Problem: 4 LRU caches exist for each process. When I clear the cache it clears for only 1 process (whichever catches the request).

  1. I want to share the LRU cache between all 4 processes. Would a custom decorator using multiprocessing.shared_memory be possible?
  2. If that is not possible, I want to at least clear the cache of all processes. Using multi-processing Queue or Listener/Client blocks the API functionality as I have to while True.

Multiple Python API processes running this code:

from functools import lru_cache


# result:
# caches factorial for this process
# wanted:
# caches factorial for all 4 processes
@lru_cache
def factorial(n):
    return n * factorial(n-1) if n else 1


@api.get("http://localhost:5000/purge/factorial/") # pseudo API decorator
def cache_clear():
    # result:
    # clears cache for this process
    # wanted:
    # clears cache for all processes
    factorial.cache_clear()
    

I use:

Thank you very much!


Solution

  • My problem was not the slow JSON/pickle serialization. It was slow value retrieval from Redis storage due to larger cached variable size.

    Here's a simple solution to share data between Python processes. This solution converts your Python variables to bytes (pickles them) so if slow serialization is your problem, this might not be for you.

    Server (separate Python process that holds the cache):

    from multiprocessing.managers import SyncManager
    
    
    syncdict = {}
    
    
    def get_dict():
        return syncdict
    
    
    if __name__ == "__main__":
        SyncManager.register("syncdict", get_dict)
        manager = SyncManager(("127.0.0.1", 5002), authkey=b"password")
        manager.start()
        input()
        manager.shutdown()
    

    Client (module inside my FastAPI application):

    from multiprocessing.managers import SyncManager
    
    
    class CacheClient:
        def __init__(self, host: str, port: int, authkey: bytes):
            self.manager = SyncManager((host, port), authkey=authkey)
            self.manager.connect()
    
            SyncManager.register("syncdict")
    
            self.syncdict = self.manager.syncdict()
    
        def set(self, key: str, value: any):
            self.syncdict.update([(key, value)])
    
        def get(self, key: str) -> any:
            return self.syncdict.get(key)
    
    
    cache = CacheClient(host="127.0.0.1", port=5002, authkey=b"password")
    

    Tip: if you get import errors when trying to cache your Pydantic classes, pickle your data before syncdict.update. However, caution as this pickles your data twice.