Aggregate decorator

Parametric decorator that aggregates multiple (but no more than max_count defaulting to None) single-argument executions (res1 = await func(arg1), res2 = await func(arg2), …) of an asynchronous function with variadic positional arguments (async def func(*args, pho=1, bo=2) -> Iterable) into its single execution with multiple positional arguments (res1, res2, ... = await func(arg1, arg2, ...)) collected within a time window leeway_ms. It offers a trade-off between latency and throughput.

If func raises an exception, then, all of the aggregated calls will propagate the same exception. If one of the aggregated calls gets canceled during the func execution, then, another will try to execute the func.

This decorator may be useful if the func executes slow IO-tasks, is frequently called, and using cache is not a good option. As a toy example, assume that func fetches a record from the database by user ID and it is called during each request to our service. If it takes 100 ms to fetch a record and the load is 1000 RPS, then, with a 10% increase of the delay (to 110 ms), it may decrease the number of requests to the database by 10 times (to 100 QPS).

_images/aggregate-flow.svg
import asyncio
import math
from aiomisc import aggregate, entrypoint

@aggregate(leeway_ms=10, max_count=2)
async def pow(*nums: float, power: float = 2.0):
    return [math.pow(num, power) for num in nums]

async def main():
    await asyncio.gather(pow(1.0), pow(2.0))

with entrypoint() as loop:
    loop.run_until_complete(main())

To employ a more low-level approach one can use aggregate_async instead. In this case, the aggregating function accepts Arg parameters, each containing value and future attributes. It is responsible for setting the results of execution for all the futures (instead of returning values).

import asyncio
import math
from aiomisc import aggregate_async, entrypoint
from aiomisc.aggregate import Arg


@aggregate_async(leeway_ms=10, max_count=2)
async def pow(*args: Arg, power: float = 2.0):
    for arg in args:
        arg.future.set_result(math.pow(arg.value, power))

async def main():
    await asyncio.gather(pow(1), pow(2))

with entrypoint() as loop:
    loop.run_until_complete(main())