Parametric decorator that aggregates multiple
(but no more than
max_count defaulting to
res1 = await func(arg1),
res2 = await func(arg2), …)
of an asynchronous function with variadic positional arguments
async def func(*args, pho=1, bo=2) -> Iterable) into its single execution
with multiple positional arguments
res1, res2, ... = await func(arg1, arg2, ...)) collected within a time
leeway_ms. It offers a trade-off between latency and throughput.
func raises an exception, then, all of the aggregated calls will
propagate the same exception. If one of the aggregated calls gets canceled
func execution, then, another will try to execute the
This decorator may be useful if the
func executes slow IO-tasks,
is frequently called, and using cache is not a good option. As a toy example,
func fetches a record from the database by user ID and it is
called during each request to our service. If it takes 100 ms to fetch a
record and the load is 1000 RPS, then, with a 10% increase of the delay
(to 110 ms), it may decrease the number of requests to the database by
10 times (to 100 QPS).
import asyncio import math from aiomisc import aggregate, entrypoint @aggregate(leeway_ms=10, max_count=2) async def pow(*nums: float, power: float = 2.0): return [math.pow(num, power) for num in nums] async def main(): await asyncio.gather(pow(1.0), pow(2.0)) with entrypoint() as loop: loop.run_until_complete(main())
To employ a more low-level approach one can use aggregate_async instead. In this case, the aggregating function accepts Arg parameters, each containing value and future attributes. It is responsible for setting the results of execution for all the futures (instead of returning values).
import asyncio import math from aiomisc import aggregate_async, entrypoint from aiomisc.aggregate import Arg @aggregate_async(leeway_ms=10, max_count=2) async def pow(*args: Arg, power: float = 2.0): for arg in args: arg.future.set_result(math.pow(arg.value, power)) async def main(): await asyncio.gather(pow(1), pow(2)) with entrypoint() as loop: loop.run_until_complete(main())