Search code examples
multithreadingpython-2.7amqpgeventkombu

ConcurrentObjectUseError in gevent when using kombu


I'm using kombu in my code, and sometimes I get the following exception from kombu's producer's publish method. I think it is reproduced more in stress conditions, so probably a multithreading issue, and the exception description seems to point that way as well.

Using python 2.7.18, kombu 4.6.11, amqp 2.6.1, gevent 20.6.2

I would appreciate any help. Thanks!!

I also saw these possibly-related pages, though I couldn't infer from them what I should do in my case:

The exception:

Traceback (most recent call last):
File "C:\Code\A\home_common\rabbitmq_common.py", line 168, in send_data_message
self.producer.publish(data_message, routing_key=destination, exchange=self.dataDirectExchange, headers={'source': source}, content_encoding='binary', content_type='application/octet-stream', retry=True)
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 181, in publish
exchange_name, declare,
File "C:\Python27\lib\site-packages\kombu\connection.py", line 533, in _ensured
return fun(*args, **kwargs)
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "C:\Python27\lib\site-packages\amqp\channel.py", line 1766, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "C:\Python27\lib\site-packages\amqp\abstract_channel.py", line 59, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "C:\Python27\lib\site-packages\amqp\method_framing.py", line 189, in write_frame
write(view[:offset])
File "C:\Python27\lib\site-packages\amqp\transport.py", line 305, in write
self._write(s)
File "C:\Python27\lib\site-packages\gevent_socket2.py", line 383, in sendall
return _socketcommon._sendall(self, data_memory, flags)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py", line 392, in _sendall
timeleft = __send_chunk(socket, chunk, flags, timeleft, end)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py", line 321, in __send_chunk
data_sent += socket.send(chunk, flags)
File "C:\Python27\lib\site-packages\gevent_socket2.py", line 369, in send
self._wait(self._write_event)
File "src\\gevent
_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\\gevent
_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\\gevent
_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x0598E810>>

Solution

  • For future reference - it seems this issue was fixed by locking all kombu calls that may contact the rabbit server, e.g.:

    with self._kombu_lock:
        self.producer.publish(data_message, routing_key=destination, exchange=self.dataDirectExchange, headers={'source': source}, content_encoding='binary', content_type='application/octet-stream', retry=True)