Search code examples
pythonpython-asyncioevent-loopgoogle-cloud-sdk

When using AsyncClients in Google Cloud Python SDK, got future attached to a different loop


I'm quite new to using asyncio in Python. Originally I have a sync function speak:

from google.cloud import texttospeech
client = texttospeech.TextToSpeechClient()
def speak(*args):
  # omitted
  result = client.synthesize_speech(...)

async def my_work(*args):
  # omitted
  loop = asyncio.get_running_loop()
  responses = ["Hi", "Bye"] # actually awaited results
  await asyncio.gather(*(
    loop.run_in_executor(
      None,
      speak,
      ...
    ) for text in responses
  ))

I didn't encounter bugs with that setup. But now I'm trying to use the async version:

from google.cloud import texttospeech
client = texttospeech.TextToSpeechAsyncClient()
async def speak(*args):
  # omitted
  result = await client.synthesize_speech(...)

async def my_work(*args):
  # omitted
  loop = asyncio.get_running_loop()
  responses = ["Hi", "Bye"] # actually awaited results
  await asyncio.gather(*[speak(...) for text in responses])

The following error happens:

RuntimeError: Task
<Task pending name='Task-58' coro=<speak() running at /app/google_tts.py:36>
cb=[gather.<locals>._done_callback() at /usr/local/lib/python3.11/asyncio/tasks.py:764]>
got Future <Task pending name='Task-69' coro=<UnaryUnaryCall._invoke() running at /usr/local/lib/python3.11/site-packages/grpc/aio/_call.py:568>>
attached to a different loop

Some questions:

  • Does what I'm trying to do have benefits aside from code clarity?
  • How should I choose between running sync function elsewhere and running async counterparts?

Solution

  • Per the information on https://cloud.google.com/python/docs/reference/texttospeech/latest/multiprocessing, creating the client after the invocation of os.fork() by multiprocessing.pool.Pool worked.

    from google.cloud import texttospeech
    client = texttospeech.TextToSpeechAsyncClient()
    async def speak(client, *args):
      # omitted
      result = await client.synthesize_speech(...)
    
    async def my_work(*args):
      # omitted
      loop = asyncio.get_running_loop()
      responses = ["Hi", "Bye"] # actually awaited results
      with ThreadPoolExecutor() as pool:
        client = texttospeech.TextToSpeechAsyncClient()
        await asyncio.gather(*[speak(client, ...) for text in responses])