2.12. @aiomisc.aggregate
¶
Параметрический декоратор, который агрегирует несколько (но не больше, чем max_count
, по умолчанию None
) вызовов с одним параметром (res1 = await func(arg1)
, res2 = await func(arg2)
, …) асинхронной функции с переменными количеством позиционных параметров (async def func(*args, pho=1, bo=2) -> Iterable
) в единственный вызов с несколькими параметрами (res1, res2, ... = await func(arg1, arg2, ...)
), собранными в течение окна leeway_ms
. Он позволяет пожертвовать задержкой ради увеличения пропускной спсобности.
Если func
бросает исключение, тогда все агрегированные вызовы бросят то же самое исключение. Если один агрегированный вызов будет отменён во время выполнения func
, тогда другой попробует выполнить func
вместо него.
Этот декоратор может быть полезен, если func
выполняет медленные IO-задачи, часто вызывается, а использование кеширования не предпочтительно. В качестве примера, пусть func
запрашивает запись из БД по ID пользователя во время каждого запроса к нашему сервису. Если запрос к БД занимает 100 мс, а нагрузка на сервис составляет 1000 RPS, то с 10% увеличением задержки (до 110 ms) количество запросов к БД упадёт в 10 раз (до 100 QPS)
import asyncio
import math
from aiomisc import aggregate, entrypoint
@aggregate(leeway_ms=10, max_count=2)
async def pow(*nums: float, power: float = 2.0):
return [math.pow(num, power) for num in nums]
async def main():
await asyncio.gather(pow(1.0), pow(2.0))
with entrypoint() as loop:
loop.run_until_complete(main())
Для более низкоуровнего подхода можно воспользоваться декоратором aggregate_async. В этом случае агрегирующая функция принимает в качестве параметров переменные Arg, содержащие значение параметра value и футуру future. Функция ответственна за выставление результатов работы для всех футур (вместо обычного возврата результатов).
import asyncio
import math
from aiomisc import aggregate_async, entrypoint
from aiomisc.aggregate import Arg
@aggregate_async(leeway_ms=10, max_count=2)
async def pow(*args: Arg, power: float = 2.0):
for arg in args:
arg.future.set_result(math.pow(arg.value, power))
async def main():
await asyncio.gather(pow(1), pow(2))
with entrypoint() as loop:
loop.run_until_complete(main())