2.12. @aiomisc.aggregate

Параметрический декоратор, который агрегирует несколько (но не больше, чем max_count, по умолчанию None) вызовов с одним параметром (res1 = await func(arg1), res2 = await func(arg2), …) асинхронной функции с переменными количеством позиционных параметров (async def func(*args, pho=1, bo=2) -> Iterable) в единственный вызов с несколькими параметрами (res1, res2, ... = await func(arg1, arg2, ...)), собранными в течение окна leeway_ms. Он позволяет пожертвовать задержкой ради увеличения пропускной спсобности.

Если func бросает исключение, тогда все агрегированные вызовы бросят то же самое исключение. Если один агрегированный вызов будет отменён во время выполнения func, тогда другой попробует выполнить func вместо него.

Этот декоратор может быть полезен, если func выполняет медленные IO-задачи, часто вызывается, а использование кеширования не предпочтительно. В качестве примера, пусть func запрашивает запись из БД по ID пользователя во время каждого запроса к нашему сервису. Если запрос к БД занимает 100 мс, а нагрузка на сервис составляет 1000 RPS, то с 10% увеличением задержки (до 110 ms) количество запросов к БД упадёт в 10 раз (до 100 QPS)

_images/aggregate-flow.svg
import asyncio
import math
from aiomisc import aggregate, entrypoint

@aggregate(leeway_ms=10, max_count=2)
async def pow(*nums: float, power: float = 2.0):
    return [math.pow(num, power) for num in nums]

async def main():
    await asyncio.gather(pow(1.0), pow(2.0))

with entrypoint() as loop:
    loop.run_until_complete(main())

Для более низкоуровнего подхода можно воспользоваться декоратором aggregate_async. В этом случае агрегирующая функция принимает в качестве параметров переменные Arg, содержащие значение параметра value и футуру future. Функция ответственна за выставление результатов работы для всех футур (вместо обычного возврата результатов).

import asyncio
import math
from aiomisc import aggregate_async, entrypoint
from aiomisc.aggregate import Arg


@aggregate_async(leeway_ms=10, max_count=2)
async def pow(*args: Arg, power: float = 2.0):
    for arg in args:
        arg.future.set_result(math.pow(arg.value, power))

async def main():
    await asyncio.gather(pow(1), pow(2))

with entrypoint() as loop:
    loop.run_until_complete(main())