I have the following code, all pretty simple:
from multiprocessing import Process, Lock
from multiprocessing.managers import BaseManager
class DatabaseConnection:
def __init__(self, conn_id):
self.id = conn_id
def __repr__(self):
return f'connection(id={self.id})'
class DatabaseConnectionPool:
def __init__(self):
self.mutex = Lock()
self.max_connections = 10
self.ready_pool = [DatabaseConnection(i) for i in range(self.max_connections)]
self.leased_pool = []
def get(self):
if not self.ready_pool:
raise Exception('no connections available')
conn = self.ready_pool[-1]
self.ready_pool.pop()
self.leased_pool.append(conn)
return conn
class Manager(BaseManager): pass
Manager.register('DatabaseConnectionPool', DatabaseConnectionPool)
def proc(pool):
try:
conn = pool.get()
print(f'leased {conn}')
except:
print(f'failed to lease connection')
if __name__ == '__main__':
manager = Manager()
manager.start()
pool = manager.DatabaseConnectionPool()
procs = []
for _ in range(11):
p = Process(target=proc, args=(pool,))
procs.append(p)
p.start()
for p in procs:
p.join()
when I run this, I get:
leased connection(id=9)
leased connection(id=8)
leased connection(id=7)
leased connection(id=6)
leased connection(id=5)
leased connection(id=4)
leased connection(id=3)
leased connection(id=2)
leased connection(id=1)
leased connection(id=0)
failed to lease connection
I am essentially creating a DatabaseConnectionPool
class shared across a bunch of processes, then having each process retrieve a DatabaseConnection
object from the pool, expecting this to cause a race condition but I can't seem to provoke one here.
Specifically, I would expect the line conn = self.ready_pool[-1]
to result in a race condition such that some processes return connections with duplicated ids since I don't have a mutex around the DatabaseConnectionPool.Get()
method.
Am I understanding something wrong here? Since the DatabaseConnectionPool
is shared, each process is pulling from the same list and the following two lines are not atomic (purposefully) as far as I know.
conn = self.ready_pool[-1]
self.ready_pool.pop()
TLDR; there is indeed a race condition, but the complexity (overhead) of Manager
relative to the speed of your example function get
makes it extremely difficult to trigger.
"Since the DatabaseConnectionPool
is shared": it's not. You only share proxies to this object. When you call manager.start()
, a new process is started which will hold all the actual instances that are "managed" by the manager. Calling your registered constructor: manager.DatabaseConnectionPool()
will return a proxy to the instance which is created in the manager process.
These proxies are picklable, and therefore can easily be sent to other processes. When you call a method of one of these proxies, the method name and arguments are pickled and sent to the manager process where the call is actually performed, and then the results are pickled and sent back (remote procedure call).
On the server side, a single thread continuously waits on new connections with a multiprocessing.connection.Listener
. For each incoming connection (assumed to be a single RPC), a new thread is created to handle and respond to the requested procedure call. In this way, any method that is called on your class instance must be thread safe (though not actually process safe), as any other method (that is called remotely) may be called at the same time in another thread.
Now consider what all would have to happen in order to trigger a race in your example:
import __main__
and pickle/unpickle process
object)multiprocessing.connection.Connection
in order to perform RPC (this also takes quite a while due to required OS calls, as well as authkey handshake).get
call will be comparatively very fast to the rest here)1, 2, and 3 of two separate processes would all have to complete at very nearly exactly the same time, and each is a fairly long operation with a good amount of variability based on how busy the OS is (the OS must allocate things like pipes, processes, sockets, etc..). Even if the average time to complete is similar, if the variability is significantly greater than the time it takes to perform the get
call, it will be very unlikely that two calls will line up. If instead get
takes a rather long time, the likelihood that any two get
calls may be running concurrently rises. For example, adding a bit of random delay in the get
method easily breaks the example:
...
conn = self.ready_pool[-1]
sleep(random()*.1)
self.ready_pool.pop()
leased connection(id=9)
leased connection(id=8)
leased connection(id=8)
leased connection(id=7)
leased connection(id=7)
leased connection(id=8)
leased connection(id=9)
leased connection(id=8)
leased connection(id=1)
leased connection(id=1)
failed to lease connection
>>>