So I asked this question and tried the ProcessPoolExecutor approach. I used the decorator suggested the following way:
Running Image Manipulation in run_in_executor. Adapting to multiprocessing
import asyncio
import functools
from concurrent import futures
from app.exceptions.errors import ManipulationError
_pool = futures.ProcessPoolExecutor()
def executor(function):
@functools.wraps(function)
def decorator(*args, **kwargs):
try:
partial = functools.partial(function, *args, **kwargs)
loop = asyncio.get_event_loop()
return loop.run_in_executor(_pool, partial)
except Exception as e:
raise ManipulationError(str(e))
return decorator
I then used it on a function like:
@executor
@pil
def blur(image):
frame = image.convert("RGBA")
return frame.filter(ImageFilter.BLUR)
Note the @pil
is another decorator I made.
def pil(function):
@functools.wraps(function)
def wrapper(image, *args, **kwargs) -> BytesIO:
img = PILManip.pil_image(image)
if img.format == "GIF":
frames = []
for frame in ImageSequence.Iterator(img):
res_frame = function(frame, *args, **kwargs)
frames.append(res_frame)
return PILManip.pil_gif_save(frames), "gif"
elif img.format in ["PNG", "JPEG"]:
img = function(img, *args, **kwargs)
return PILManip.pil_image_save(img), "png"
else:
raise BadImage("Bad Format")
return wrapper
I called it in a FastApi route like so:
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
byt = await Client.image_bytes(url)
img, image_format = await blur(byt)
return Response(img.read(), media_type=f"image/{image_format}")
I get some error about pickling.
500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 391, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "/home/codespace/workspace/dagpi-image/app/middleware/timer.py", line 8, in add_process_time_header
response = await call_next(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
task.result()
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 56, in dispatch
raise e from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 52, in dispatch
response = await call_next(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
task.result()
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
raw_response = await run_endpoint_function(
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File "/home/codespace/workspace/dagpi-image/app/routes/image_routes.py", line 107, in blur_image
img, image_format = await blur(byt)
File "/opt/python/3.8.6/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/python/3.8.6/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function blur at 0x7f87524091f0>: it's not the same object as app.image.pil_manipulation.blur
Does someone know why this keeps happening? I was told the objects have to be serializable, I believe BytesIO is the only in/out of the image. That should be serializable.
Decorators typically produce wrapped functions that aren't easy to pickle (serialize) because they contain hidden state. When dealing with multiprocessing, you should avoid decorators and send ordinary global functions to run_in_executor
. For example, you could re-write your executor
decorator into a utility function:
_pool = concurrent.futures.ProcessPoolExecutor()
async def exec_async(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_pool, fn, *args)
Instead of decorating a function with executor
, you can just await it using await exec_async(some_function, arg1, arg2, ...)
. Likewise, you can rewrite the pil
decorator into another utility:
def pil(image, transform):
img = PILManip.pil_image(image)
if img.format == "GIF":
frames = []
for frame in ImageSequence.Iterator(img):
res_frame = transform(frame)
frames.append(res_frame)
return PILManip.pil_gif_save(frames), "gif"
elif img.format in ["PNG", "JPEG"]:
img = transform(img)
return PILManip.pil_image_save(img), "png"
else:
raise BadImage("Bad Format")
The implementation of blur
now becomes an ordinary function which calls pil
, and which can be safely passed to exec_async
:
def blur(image):
def transform(frame):
frame = frame.convert("RGBA")
return frame.filter(ImageFilter.BLUR)
return pil(image, transform)
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
byt = await Client.image_bytes(url)
img, image_format = await exec_async(blur, byt)
return Response(img.read(), media_type=f"image/{image_format}")
Note: the above code is untested.