Search code examples
pythonmultithreadingsalt-project

When I use threads to execute the salt agent api, I always get an error RuntimeError: There is no current event loop in thread 'Thread-*'


When I use threads to execute salt.client.get_local_client() and call the cmd() function concurrently, I receive the error about the absence of an event loop in the thread. I'm unsure if the issue lies with my approach or the design of Salt itself.


python test.py 
Exception in thread Thread-2:
Traceback (most recent call last):
Exception in thread Thread-1:
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 387, in run_job
Traceback (most recent call last):
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 387, in run_job
    pub_data = self.pub(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 1898, in pub
    pub_data = self.pub(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 1898, in pub
    with salt.channel.client.ReqChannel.factory(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/channel/client.py", line 56, in factory
    with salt.channel.client.ReqChannel.factory(
    return SyncWrapper(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/channel/client.py", line 56, in factory
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/utils/asynchronous.py", line 76, in __init__
    return SyncWrapper(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/utils/asynchronous.py", line 76, in __init__
    self.obj = cls(*args, **kwargs)
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/channel/client.py", line 138, in factory
    self.obj = cls(*args, **kwargs)
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/channel/client.py", line 138, in factory
    transport = salt.transport.request_client(opts, io_loop=io_loop)
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/transport/base.py", line 59, in request_client
    transport = salt.transport.request_client(opts, io_loop=io_loop)
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/transport/base.py", line 59, in request_client
    return salt.transport.zeromq.RequestClient(opts, io_loop=io_loop)
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/transport/zeromq.py", line 1084, in __init__
    return salt.transport.zeromq.RequestClient(opts, io_loop=io_loop)
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/transport/zeromq.py", line 1084, in __init__
    self.sending = asyncio.Lock()
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/asyncio/locks.py", line 81, in __init__
    self.sending = asyncio.Lock()
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/asyncio/locks.py", line 81, in __init__
    self._loop = events.get_event_loop()
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
    self._loop = events.get_event_loop()
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-2'.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/threading.py", line 917, in run
    self.run()
  File "/home/test/.pyenv/versions/3.9.18/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/home/test/test.py", line 9, in test_sleep
    self._target(*self._args, **self._kwargs)
    cl.cmd('master-local', 'test.sleep', [2])
  File "/home/test/test.py", line 9, in test_sleep
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 752, in cmd
    cl.cmd('master-local', 'test.sleep', [2])
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 752, in cmd
    pub_data = self.run_job(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 409, in run_job
    pub_data = self.run_job(
  File "/home/test/.pyenv/versions/test/lib/python3.9/site-packages/salt/client/__init__.py", line 409, in run_job
    raise SaltClientError(general_exception)
salt.exceptions.SaltClientError: There is no current event loop in thread 'Thread-2'.
    raise SaltClientError(general_exception)
salt.exceptions.SaltClientError: There is no current event loop in thread 'Thread-1'.

The following is my test code

from threading import Thread

import salt.client
import time

cl = salt.client.get_local_client()

def test_sleep():
    cl.cmd('master-local', 'test.sleep', [2])

t1 = Thread(target=test_sleep)
t2 = Thread(target=test_sleep)

t1.start()
# time.sleep(1)  # <-- Changes the error
t2.start()

t1.join()
t2.join()

The following is the salt version information

Salt Version:
          Salt: 3007.1
 
Python Version:
        Python: 3.9.18 (main, Feb  3 2024, 15:02:34)
 
Dependency Versions:
          cffi: 1.16.0
      cherrypy: 18.10.0
      dateutil: 2.8.2
     docker-py: Not Installed
         gitdb: Not Installed
     gitpython: Not Installed
        Jinja2: 3.1.3
       libgit2: Not Installed
  looseversion: 1.3.0
      M2Crypto: Not Installed
          Mako: Not Installed
       msgpack: 1.0.7
  msgpack-pure: Not Installed
  mysql-python: Not Installed
     packaging: 23.2
     pycparser: 2.21
      pycrypto: 2.6.1
  pycryptodome: 3.20.0
        pygit2: Not Installed
  python-gnupg: 0.5.3
        PyYAML: 6.0.1
         PyZMQ: 25.1.2
        relenv: 0.16.0
         smmap: Not Installed
       timelib: 0.3.0
       Tornado: 6.4
           ZMQ: 4.3.4
 
Salt Package Information:
  Package Type: Not Installed
 
System Versions:
          dist: centos 8.3.2011 
        locale: utf-8
       machine: x86_64
       release: 4.18.0-240.el8.x86_64
        system: Linux
       version: CentOS Linux 8.3.2011 

How can I properly achieve concurrent calls using threads with the Salt client API? Is this a limitation in Salt’s design, or is there something wrong with my implementation?


Solution

  • The following solution solved my problem

    import threading
    import tornado.ioloop
    import salt.client
    
    def test_sleep(): 
        tornado.ioloop.IOLoop.current()
        current_thread = threading.current_thread()
        
        local = salt.client.LocalClient()
        
        result = local.cmd('test', 'test.ping')  
    
    
    t1 = threading.Thread(target=test_sleep, name='Thread-1')
    t2 = threading.Thread(target=test_sleep, name='Thread-2')
    
    t1.start()
    t2.start()
    
    t1.join()
    

    The official documentation of the salt client api explains

    The LocalClient uses a Tornado IOLoop, this can create issues when using the LocalClient inside an existing IOLoop. If creating the LocalClient in partnership with another IOLoop either create the IOLoop before creating the LocalClient, or when creating the IOLoop use ioloop.current() which will return the ioloop created by LocalClient.