How do I write a middleware to forward media groups to aiogram 3? Users in the group send a command to the bot. The bot will send them a message, to which they must respond with a media group, which the bot will then forward to everyone who uses the bot. But since sending media groups for a bot is several messages, I need a middleware to collect them all and then send them to users
I tried to use the code that is already available for aiogram 2, but I don't understand how to redo it for version 3
Simple Media middleware:
import asyncio
from typing import *
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery
class MediaMiddleware(BaseMiddleware):
def __init__(self, latency: Union[int, float] = 0.01):
self.medias = {}
self.latency = latency
super(MediaMiddleware, self).__init__()
async def __call__(
self,
handler: Callable[[Union[Message, CallbackQuery], Dict[str, Any]], Awaitable[Any]],
event: Union[Message, CallbackQuery],
data: Dict[str, Any]
) -> Any:
if isinstance(event, Message) and event.media_group_id:
try:
self.medias[event.media_group_id].append(event)
return
except KeyError:
self.medias[event.media_group_id] = [event]
await asyncio.sleep(self.latency)
data["media_events"] = self.medias.pop(event.media_group_id)
return await handler(event, data)
Usage:
@router.message(F.media_group_id != None)
def on_media_group_id(message: Message, media_events: List[Message] = []):
# message - last sended message
# media_events - all events with the same media_group_id
...