I have an agent-based model, where several agents are started by a central process and communicate via another central process. Every agent and the communication process communicate via zmq. However, when I start more than 100 agents standard_out sends:
Invalid argument (src/stream_engine.cpp:143) Too many open files (src/ipc_listener.cpp:292)
and Mac Os prompts a problem report :
Python quit unexpectedly while using the libzmq.5.dylib plug-in.
The problem appears to me that too many contexts are opened. But how can I avoid this with multiprocessing?
I attach part of the code below:
class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
def __init__(self, idn, group, _addresses, trade_logging):
multiprocessing.Process.__init__(self)
....
def run(self):
self.context = zmq.Context()
self.commands = self.context.socket(zmq.SUB)
self.commands.connect(self._addresses['command_addresse'])
self.commands.setsockopt(zmq.SUBSCRIBE, "all")
self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out = self.context.socket(zmq.PUSH)
self.out.connect(self._addresses['frontend'])
time.sleep(0.1)
self.database_connection = self.context.socket(zmq.PUSH)
self.database_connection.connect(self._addresses['database'])
time.sleep(0.1)
self.logger_connection = self.context.socket(zmq.PUSH)
self.logger_connection.connect(self._addresses['logger'])
self.messages_in = self.context.socket(zmq.DEALER)
self.messages_in.setsockopt(zmq.IDENTITY, self.name)
self.messages_in.connect(self._addresses['backend'])
self.shout = self.context.socket(zmq.SUB)
self.shout.connect(self._addresses['group_backend'])
self.shout.setsockopt(zmq.SUBSCRIBE, "all")
self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))
self.out.send_multipart(['!', '!', 'register_agent', self.name])
while True:
try:
self.commands.recv() # catches the group adress.
except KeyboardInterrupt:
print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
break
command = self.commands.recv()
if command == "!":
subcommand = self.commands.recv()
if subcommand == 'die':
self.__signal_finished()
break
try:
self._methods[command]()
except KeyError:
if command not in self._methods:
raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
else:
raise
except KeyboardInterrupt:
print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
break
if command[0] != '_':
self.__reject_polled_but_not_accepted_offers()
self.__signal_finished()
#self.context.destroy()
the whole code is under http://www.github.com/DavoudTaghawiNejad/abce
Odds are it's not too many contexts, it's too many sockets. Looking through your repo, I see you're (correctly) using IPC as your transport; IPC uses a file descriptor as the "address" to pass data back and forth between different processes. If I'm reading correctly, you're opening up to 7 sockets per process, so that'll add up quickly. I'm betting that if you do some debugging in the middle of your code, you'll see that it doesn't fail when the last context is created, but when the last socket pushes the open file limit over the edge.
My understanding is that the typical user limit for open FDs is around 1000, so at around 100 agents you're pushing 700 open FDs just for your sockets. The remainder is probably just typical. There should be no problem increasing your limit up to 10,000, higher depending on your situation. Otherwise you'll have to rewrite to use less sockets per process to get a higher process limit.