3.1. aiomisc module#

3.1.1. aiomisc.aggregate module#

class aiomisc.aggregate.AggregateStatistic(name: str | None = None)[source]#
done: int#
error: int#
leeway_ms: float#
max_count: int#
name: str | None#
success: int#
class aiomisc.aggregate.Aggregator(func: Callable[[Any], Awaitable[Iterable]] | Callable[[Arg], Awaitable], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]#
async aggregate(arg: Any) Any[source]#
property count: int#
property leeway_ms: float#
property max_count: int | None#
class aiomisc.aggregate.AggregatorAsync(func: Callable[[Any], Awaitable[Iterable]] | Callable[[Arg], Awaitable], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]#
class aiomisc.aggregate.Arg(value, future)[source]#

Create new instance of Arg(value, future)

future: Future#

Alias for field number 1

value: Any#

Alias for field number 0

exception aiomisc.aggregate.ResultNotSetError[source]#
aiomisc.aggregate.aggregate(leeway_ms: float, max_count: int | None = None) Callable[source]#

Parametric decorator that aggregates multiple (but no more than max_count defaulting to None) single-argument executions (res1 = await func(arg1), res2 = await func(arg2), …) of an asynchronous function with variadic positional arguments (async def func(*args, pho=1, bo=2) -> Iterable) into its single execution with multiple positional arguments (res1, res2, ... = await func(arg1, arg2, ...)) collected within a time window leeway_ms.

Note

func must return a sequence of values of length equal to the number of arguments (and in the same order).

Note

if some unexpected error occurs, exception is propagated to each future; to set an individual error for each aggregated call refer to aggregate_async.

Parameters:
  • leeway_ms – The maximum approximate delay between the first collected argument and the aggregated execution.

  • max_count – The maximum number of arguments to call decorated function with. Default None.

Returns:

aiomisc.aggregate.aggregate_async(leeway_ms: float, max_count: int | None = None) Callable[source]#

Same as aggregate, but with func arguments of type Arg containing value and future attributes instead. In this setting func is responsible for setting individual results/exceptions for all of the futures or throwing an exception (it will propagate to futures automatically). If func mistakenly does not set a result of some future, then, ResultNotSetError exception is set.

Returns:

3.1.2. aiomisc.backoff module#

class aiomisc.backoff.BackoffStatistic(name: str | None = None)[source]#
attempts: int#
cancels: int#
done: int#
errors: int#
name: str | None#
sum_time: float#
class aiomisc.backoff.RetryStatistic(name: str | None = None)[source]#
name: str | None#
aiomisc.backoff.asyncbackoff(attempt_timeout: int | float | None, deadline: int | float | None, pause: int | float = 0, *exc: ~typing.Type[Exception], exceptions: ~typing.Tuple[~typing.Type[Exception], ...] = (), max_tries: int | None = None, giveup: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None, statistic_class: ~typing.Type[~aiomisc.backoff.BackoffStatistic] = <class 'aiomisc.backoff.BackoffStatistic'>) Callable[[...], Callable[[...], Awaitable[T]]][source]#

Patametric decorator that ensures that attempt_timeout and deadline time limits are met by decorated function.

In case of exception function will be called again with similar arguments after pause seconds.

Parameters:
  • statistic_name – name filed for statistic instances

  • attempt_timeout – is maximum execution time for one execution attempt.

  • deadline – is maximum execution time for all execution attempts.

  • pause – is time gap between execution attempts.

  • exc – retrying when this exceptions was raised.

  • exceptions – similar as exc but keyword only.

  • max_tries – is maximum count of execution attempts (>= 1).

  • giveup – is a predicate function which can decide by a given

  • statistic_class – statistic class

aiomisc.backoff.asyncretry(max_tries: int | None, exceptions: ~typing.Tuple[~typing.Type[Exception], ...] = (<class 'Exception'>,), pause: int | float = 0, giveup: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None) Callable[[...], Callable[[...], Awaitable[T]]][source]#

Shortcut of asyncbackoff(None, None, 0, Exception).

In case of exception function will be called again with similar arguments after pause seconds.

Parameters:
  • max_tries – is maximum count of execution attempts (>= 1 or None means infinity).

  • exceptions – similar as exc but keyword only.

  • giveup – is a predicate function which can decide by a given

  • pause – is time gap between execution attempts.

  • statistic_name – name filed for statistic instances

3.1.3. aiomisc.circuit_breaker module#

class aiomisc.circuit_breaker.CircuitBreaker(error_ratio: float, response_time: int | float, exceptions: ~typing.Iterable[~typing.Type[Exception]] = (<class 'Exception'>,), recovery_time: int | float | None = None, broken_time: int | float | None = None, passing_time: int | float | None = None, exception_inspector: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None)[source]#

Circuit Breaker pattern implementation. The class instance collects call statistics through the call or call async methods.

The state machine has three states: * CircuitBreakerStates.PASSING * CircuitBreakerStates.BROKEN * CircuitBreakerStates.RECOVERING

In passing mode all results or exceptions will be returned as is. Statistic collects for each call.

In broken mode returns exception CircuitBroken for each call. Statistic doesn’t collecting.

In recovering mode the part of calls is real function calls and remainings raises CircuitBroken. The count of real calls grows exponentially in this case but when 20% (by default) will be failed the state returns to broken state.

Parameters:
  • error_ratio – Failed to success calls ratio. The state might be changed if ratio will reach given value within response time (in seconds). Value between 0.0 and 1.0.

  • response_time – Time window to collect statistics (seconds)

  • exceptions – Only this exceptions will affect ratio. Base class Exception used by default.

  • recovery_time – minimal time in recovery state (seconds)

  • broken_time – minimal time in broken state (seconds)

  • passing_time – minimum time in passing state (seconds)

BUCKET_COUNT = 10#
PASSING_BROKEN_THRESHOLD = 1#
RECOVER_BROKEN_THRESHOLD = 0.5#
bucket() int[source]#
call(func: Callable[[...], T], *args: Any, **kwargs: Any) T[source]#
async call_async(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T[source]#
context() Generator[Any, Any, Any][source]#
counter() Counter[source]#
get_state_delay() int | float[source]#
property recovery_ratio: int | float#
property response_time: int | float#
property state: CircuitBreakerStates#
class aiomisc.circuit_breaker.CircuitBreakerStates(value)[source]#

An enumeration.

BROKEN = 1#
PASSING = 0#
RECOVERING = 2#
class aiomisc.circuit_breaker.CircuitBreakerStatistic(name: str | None = None)[source]#
call_broken: int#
call_count: int#
call_passing: int#
call_recovering: int#
call_recovering_failed: int#
call_recovering_ok: int#
error_ratio: float#
error_ratio_threshold: float#
name: str | None#
exception aiomisc.circuit_breaker.CircuitBroken(last_exception: Exception | None)[source]#
last_exception#
class aiomisc.circuit_breaker.CounterKey(value)[source]#

An enumeration.

FAIL = 0#
OK = 1#
TOTAL = 2#
aiomisc.circuit_breaker.cutout(ratio: float, response_time: int | float, *exceptions: Type[Exception], **kwargs: Any) Callable[[Callable[[...], T] | Callable[[...], Awaitable[T]]], Callable[[...], T | Awaitable[T]]][source]#
aiomisc.circuit_breaker.random() x in the interval [0, 1).#

3.1.4. aiomisc.compat module#

class aiomisc.compat.EventLoopMixin[source]#
property loop: AbstractEventLoop#
class aiomisc.compat.ParamSpec(name, *, bound=None, covariant=False, contravariant=False)[source]#

Parameter specification variable.

Usage:

P = ParamSpec('P')

Parameter specification variables exist primarily for the benefit of static type checkers. They are used to forward the parameter types of one callable to another callable, a pattern commonly found in higher order functions and decorators. They are only valid when used in Concatenate, or as the first argument to Callable, or as parameters for user-defined Generics. See class Generic for more information on generic types. An example for annotating a decorator:

T = TypeVar('T')
P = ParamSpec('P')

def add_logging(f: Callable[P, T]) -> Callable[P, T]:
    '''A type-safe decorator to add logging to a function.'''
    def inner(*args: P.args, **kwargs: P.kwargs) -> T:
        logging.info(f'{f.__name__} was called')
        return f(*args, **kwargs)
    return inner

@add_logging
def add_two(x: float, y: float) -> float:
    '''Add two numbers together.'''
    return x + y

Parameter specification variables defined with covariant=True or contravariant=True can be used to declare covariant or contravariant generic types. These keyword arguments are valid, but their actual semantics are yet to be decided. See PEP 612 for details.

Parameter specification variables can be introspected. e.g.:

P.__name__ == ‘T’ P.__bound__ == None P.__covariant__ == False P.__contravariant__ == False

Note that only parameter specification variables defined in global scope can be pickled.

property args#
property kwargs#
aiomisc.compat.final(f)[source]#

A decorator to indicate final methods and final classes.

Use this decorator to indicate to type checkers that the decorated method cannot be overridden, and decorated class cannot be subclassed. For example:

class Base:

@final def done(self) -> None:

class Sub(Base):
def done(self) -> None: # Error reported by type checker

@final class Leaf:

class Other(Leaf): # Error reported by type checker

There is no runtime checking of these properties.

aiomisc.compat.get_current_loop() CT#
aiomisc.compat.set_current_loop(value: CT) None#
aiomisc.compat.sock_set_nodelay(sock: socket) None[source]#
aiomisc.compat.sock_set_reuseport(sock: socket, reuse_port: bool) None[source]#
aiomisc.compat.time_ns() int#

Return the current time in nanoseconds since the Epoch.

3.1.5. aiomisc.context module#

class aiomisc.context.Context(loop: AbstractEventLoop)[source]#
close() None[source]#
aiomisc.context.get_context(loop: AbstractEventLoop | None = None) Context[source]#

3.1.6. aiomisc.counters module#

class aiomisc.counters.AbstractStatistic[source]#
name: str | None#
class aiomisc.counters.MetaStatistic(name: str, bases: Tuple[type, ...], dct: Dict[str, Any])[source]#
class aiomisc.counters.Metric(name: str, counter: MutableMapping[str, float | int], default: float | int = 0)[source]#
counter#
name: str#
class aiomisc.counters.Statistic(name: str | None = None)[source]#
name: str | None#
class aiomisc.counters.StatisticResult(kind, name, metric, value)[source]#

Create new instance of StatisticResult(kind, name, metric, value)

kind: Type[AbstractStatistic]#

Alias for field number 0

metric: str#

Alias for field number 2

name: str | None#

Alias for field number 1

value: int | float#

Alias for field number 3

aiomisc.counters.get_statistics(*kind: Type[Statistic]) Generator[Any, Tuple[Statistic, str, int], None][source]#

3.1.7. aiomisc.cron module#

class aiomisc.cron.CronCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#

Note

When the cron function executes longer then execution interval a next call will be skipping and warning will be logged.

static get_next(cron: croniter, _: RecurringCallback) float[source]#
start(spec: str, loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) None[source]#
stop() Future[source]#

3.1.8. aiomisc.entrypoint module#

class aiomisc.entrypoint.Entrypoint(*services: ~aiomisc.service.base.Service, loop: ~asyncio.events.AbstractEventLoop | None = None, pool_size: int | None = None, log_level: int | str = 'info', log_format: str | ~aiomisc_log.enum.LogFormat = 'plain', log_buffering: bool = True, log_buffer_size: int = 1024, log_date_format: str | None = None, log_flush_interval: float = 0.2, log_config: bool = True, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>, debug: bool = False, catch_signals: ~typing.Tuple[int, ...] | None = None, shutdown_timeout: int | float = 60.0)[source]#

Creates a new Entrypoint

Parameters:
  • debug – set debug to event-loop

  • loop – loop

  • services – Service instances which will be starting.

  • pool_size – thread pool size

  • log_level – Logging level which will be configured

  • log_format – Logging format which will be configured

  • log_buffer_size – Buffer size for logging

  • log_flush_interval – interval in seconds for flushing logs

  • log_config – if False do not configure logging

  • catch_signals – Perform shutdown when this signals will be received

  • shutdown_timeout – Timeout in seconds for graceful shutdown

AIOMISC_SHUTDOWN_TIMEOUT: float = 60.0#
DEFAULT_AIOMISC_BUFFERING: bool = True#
DEFAULT_AIOMISC_BUFFER_SIZE: int = 1024#
DEFAULT_AIOMISC_DEBUG: bool = False#
DEFAULT_AIOMISC_LOG_CONFIG: bool = True#
DEFAULT_AIOMISC_LOG_FLUSH: float = 0.2#
DEFAULT_AIOMISC_POOL_SIZE: int | None = None#
DEFAULT_LOG_DATE_FORMAT: str | None = None#
DEFAULT_LOG_FORMAT: str = 'plain'#
DEFAULT_LOG_LEVEL: str = 'info'#
POST_START = <aiomisc.signal.Signal object>#
POST_STOP = <aiomisc.signal.Signal object>#
PRE_START = <aiomisc.signal.Signal object>#
PRE_STOP = <aiomisc.signal.Signal object>#
async closing() None[source]#
classmethod get_current() Entrypoint[source]#
async graceful_shutdown(exception: Exception) None[source]#
property loop: AbstractEventLoop#
property services: Tuple[Service, ...]#
async start_services(*svc: Service) None[source]#
async stop_services(*svc: Service, exc: Exception | None = None) None[source]#
aiomisc.entrypoint.entrypoint#

alias of Entrypoint

aiomisc.entrypoint.get_context(loop: AbstractEventLoop | None = None) Context[source]#
aiomisc.entrypoint.run(coro: Coroutine[None, Any, T], *services: Service, **kwargs: Any) T[source]#

3.1.9. aiomisc.io module#

class aiomisc.io.AsyncBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
class aiomisc.io.AsyncBz2BinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncBz2TextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncFileIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
async close() None[source]#
closed() bool[source]#
executor#
fileno() int[source]#
async flush() None[source]#
property fp: IO#
static get_opener() Callable[[...], IO][source]#
isatty() bool[source]#
property mode: str#
property name: str#
async open() None[source]#
classmethod open_fp(fp: IO, executor: Executor | None = None, loop: AbstractEventLoop | None = None) AsyncFileIO[source]#
async read(n: int = -1) AnyStr[source]#
async readable() bool[source]#
async readline(limit: int = -1) AnyStr[source]#
async readlines(limit: int = -1) List[source]#
async seek(offset: int, whence: int = 0) int[source]#
async seekable() bool[source]#
async tell() int[source]#
async truncate(size: int | None = None) int[source]#
async writable() bool[source]#
async write(s: AnyStr) int[source]#
async writelines(lines: List) None[source]#
class aiomisc.io.AsyncGzipBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncGzipTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncLzmaBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncLzmaTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
buffer() AsyncBinaryIO[source]#
property encoding: str#
property errors: str | None#
executor#
property fp: TextIO#
property line_buffering: int#
property newlines: Any#
class aiomisc.io.Compression(value)[source]#

An enumeration.

BZ2 = (<class 'aiomisc.io.AsyncBz2BinaryIO'>, <class 'aiomisc.io.AsyncBz2TextIO'>)#
GZIP = (<class 'aiomisc.io.AsyncGzipBinaryIO'>, <class 'aiomisc.io.AsyncGzipTextIO'>)#
LZMA = (<class 'aiomisc.io.AsyncLzmaBinaryIO'>, <class 'aiomisc.io.AsyncLzmaTextIO'>)#
NONE = (<class 'aiomisc.io.AsyncBinaryIO'>, <class 'aiomisc.io.AsyncTextIO'>)#
aiomisc.io.async_open(fname: str | Path, mode: str = 'r', compression: Compression = Compression.NONE, encoding: str = 'utf-8', *args: Any, **kwargs: Any) AsyncFileIO | AsyncTextIO | AsyncBinaryIO[source]#

3.1.10. aiomisc.iterator_wrapper module#

exception aiomisc.iterator_wrapper.ChannelClosed[source]#
class aiomisc.iterator_wrapper.DequeWrapper[source]#
get() Any[source]#
put(item: Any) None[source]#
queue: Deque[Any]#
class aiomisc.iterator_wrapper.FromThreadChannel(maxsize: int = 0)[source]#
SLEEP_DIFFERENCE_DIVIDER = 10#
SLEEP_LOW_THRESHOLD = 0.0001#
close() None[source]#
async get() Any[source]#
property is_closed: bool#
put(item: Any) None[source]#
queue: QueueWrapperBase#
class aiomisc.iterator_wrapper.IteratorProxy(iterator: AsyncIterator, finalizer: Callable[[], Any])[source]#
class aiomisc.iterator_wrapper.IteratorWrapper(gen_func: Callable[[], Generator[T, R, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]#
close() Awaitable[None][source]#
property closed: bool#
executor#
async wait_closed() None[source]#
class aiomisc.iterator_wrapper.IteratorWrapperStatistic(name: str | None = None)[source]#
enqueued: int#
name: str | None#
queue_length: int#
queue_size: int#
started: int#
yielded: int#
class aiomisc.iterator_wrapper.QueueWrapper(max_size: int)[source]#
get() Any[source]#
put(item: Any) None[source]#
queue: Queue#
class aiomisc.iterator_wrapper.QueueWrapperBase[source]#
get() Any[source]#
abstract put(item: Any) None[source]#
aiomisc.iterator_wrapper.make_queue(max_size: int = 0) QueueWrapperBase[source]#

3.1.11. aiomisc.log module#

class aiomisc.log.LogFormat(value)[source]#

An enumeration.

classmethod choices() Tuple[str, ...][source]#
color = 1#
classmethod default() str[source]#
journald = 5#
json = 2#
plain = 4#
rich = 6#
rich_tb = 7#
stream = 0#
syslog = 3#
class aiomisc.log.LogLevel(value)[source]#

An enumeration.

classmethod choices() Tuple[str, ...][source]#
critical = 50#
debug = 10#
classmethod default() str[source]#
error = 40#
info = 20#
notset = 0#
warning = 30#
aiomisc.log.basic_config(level: int | str = 'info', log_format: str | LogFormat = 'plain', buffered: bool = True, buffer_size: int = 1024, flush_interval: int | float = 0.2, loop: AbstractEventLoop | None = None, **kwargs: Any) None[source]#

3.1.12. aiomisc.periodic module#

class aiomisc.periodic.PeriodicCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#

Note

When the periodic function executes longer then execution interval a next call would be skipped and warning would be logged.

start(interval: int | float, loop: AbstractEventLoop | None = None, *, delay: int | float = 0, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) None[source]#
stop(return_exceptions: bool = False) Future[source]#

3.1.13. aiomisc.plugins module#

3.1.14. aiomisc.pool module#

class aiomisc.pool.ContextManager(aenter: Callable[[...], Awaitable[T]], aexit: Callable[[...], Awaitable[T]])[source]#
sentinel = <object object>#
class aiomisc.pool.PoolBase(maxsize: int = 10, recycle: int | None = None)[source]#
acquire() AsyncContextManager[T][source]#
async close(timeout: int | float | None = None) None[source]#
aiomisc.pool.random() x in the interval [0, 1).#

3.1.15. aiomisc.process_pool module#

class aiomisc.process_pool.ProcessPoolExecutor(max_workers: int = 4, **kwargs: Any)[source]#

Initializes a new ProcessPoolExecutor instance.

Args:
max_workers: The maximum number of processes that can be used to

execute the given calls. If None or not given then as many worker processes will be created as the machine has processors.

mp_context: A multiprocessing context to launch the workers. This

object should provide SimpleQueue, Queue and Process.

initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer.

DEFAULT_MAX_WORKERS = 4#
submit(*args: Any, **kwargs: Any) Future[source]#

Submit blocking function to the pool

class aiomisc.process_pool.ProcessPoolStatistic(name: str | None = None)[source]#
done: int#
error: int#
name: str | None#
processes: int#
submitted: int#
success: int#
sum_time: float#

3.1.16. aiomisc.recurring module#

class aiomisc.recurring.RecurringCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#
args: Tuple[Any, ...]#
func: Callable[[...], Awaitable[Any]]#
kwargs: Mapping[str, Any]#
name: str#
start(strategy: Callable[[RecurringCallback], int | float | Awaitable[int] | Awaitable[float]], loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) Task[source]#
class aiomisc.recurring.RecurringCallbackStatistic(name: str | None = None)[source]#
call_count: int#
done: int#
fail: int#
name: str | None#
sum_time: float#
exception aiomisc.recurring.StrategyException[source]#
exception aiomisc.recurring.StrategySkip(next_attempt_delay: int | float)[source]#

Strategy function might raise this exception as way to skip current call

exception aiomisc.recurring.StrategyStop[source]#

Strategy function might raise this exception as way to stop recurring

3.1.17. aiomisc.signal module#

class aiomisc.signal.Signal[source]#
async call(*args: Any, **kwargs: Any) None[source]#
connect(receiver: Callable[[...], Any]) None[source]#
copy() Signal[source]#
disconnect(receiver: Callable[[...], Any]) None[source]#
freeze() None[source]#
property is_frozen: bool#
aiomisc.signal.receiver(s: Signal) Callable[[...], Callable[[...], T]][source]#

3.1.18. aiomisc.thread_pool module#

class aiomisc.thread_pool.CoroutineWaiter(coroutine: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None)[source]#
start() None[source]#
wait() Any[source]#
class aiomisc.thread_pool.IteratorWrapperSeparate(gen_func: Callable[[], Generator[T, R, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]#
executor#
exception aiomisc.thread_pool.ThreadPoolException[source]#
class aiomisc.thread_pool.ThreadPoolExecutor(max_workers: int = 4, loop: AbstractEventLoop | None = None, statistic_name: str | None = None)[source]#
DEFAULT_POOL_SIZE = 4#
shutdown(wait: bool = True) None[source]#

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running

futures have finished executing and the resources used by the executor have been reclaimed.

cancel_futures: If True then shutdown will cancel all pending

futures. Futures that are completed or running will not be cancelled.

submit(fn: F, *args: Any, **kwargs: Any) Future[source]#

Submit blocking function to the pool

class aiomisc.thread_pool.ThreadPoolStatistic(name: str | None = None)[source]#
done: int#
error: int#
name: str | None#
submitted: int#
success: int#
sum_time: float#
threads: int#
class aiomisc.thread_pool.WorkItem(func: Callable[[...], Any], args: Tuple[Any, ...], kwargs: Dict[str, Any], future: Future, loop: AbstractEventLoop)[source]#

Create new instance of WorkItemBase(func, args, kwargs, future, loop)

static set_result(future: Future, result: Any, exception: Exception) None[source]#
class aiomisc.thread_pool.WorkItemBase(func, args, kwargs, future, loop)[source]#

Create new instance of WorkItemBase(func, args, kwargs, future, loop)

args: Tuple[Any, ...]#

Alias for field number 1

func: Callable[[...], Any]#

Alias for field number 0

future: Future#

Alias for field number 3

kwargs: Dict[str, Any]#

Alias for field number 2

loop: AbstractEventLoop#

Alias for field number 4

aiomisc.thread_pool.context_partial(func: F, *args: Any, **kwargs: Any) Any[source]#
aiomisc.thread_pool.run_in_executor(func: Callable[[...], T], executor: ThreadPoolExecutor | None = None, args: Any = (), kwargs: Any = mappingproxy({})) Awaitable[T][source]#
aiomisc.thread_pool.run_in_new_thread(func: F, args: Any = (), kwargs: Any = mappingproxy({}), detach: bool = True, no_return: bool = False, statistic_name: str | None = None) Future[source]#
aiomisc.thread_pool.sync_await(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T[source]#
aiomisc.thread_pool.sync_wait_coroutine(loop: AbstractEventLoop | None, coro_func: Callable[[...], Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) T[source]#
aiomisc.thread_pool.threaded(func: Callable[[P], T]) Callable[[P], Awaitable[T]][source]#
aiomisc.thread_pool.threaded_iterable(func: F | None = None, max_size: int = 0) Any[source]#
aiomisc.thread_pool.threaded_iterable_separate(func: F | None = None, max_size: int = 0) Any[source]#
aiomisc.thread_pool.threaded_separate(func: F, detach: bool = True) Callable[[...], Awaitable[Any]][source]#
aiomisc.thread_pool.wait_coroutine(coro: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None) T[source]#

3.1.19. aiomisc.timeout module#

aiomisc.timeout.timeout(value: int | float) Callable[[Callable[[...], Awaitable[T]]], Callable[[...], Awaitable[T]]][source]#

3.1.20. aiomisc.utils module#

class aiomisc.utils.SelectAwaitable(*awaitables: Awaitable[T], return_exceptions: bool = False, cancel: bool = True, timeout: int | float | None = None, wait: bool = True, loop: AbstractEventLoop | None = None)[source]#

Select one of passed awaitables

Parameters:
  • awaitables – awaitable objects

  • return_exceptions – if True exception will not be raised just returned as result

  • cancel – cancel unfinished coroutines (default True)

  • timeout – execution timeout

  • wait – when False and cancel=True, unfinished coroutines will be cancelled in the background.

  • loop – event loop

property loop: AbstractEventLoop#
class aiomisc.utils.SelectResult(length: int)[source]#
done() bool[source]#
is_exception: bool | None#
length#
result() Any[source]#
result_idx: int | None#
set_result(idx: int, value: Any, is_exception: bool) None[source]#
value: Any#
aiomisc.utils.awaitable(func: Callable[[...], AT | Awaitable[AT]]) Callable[[...], Awaitable[AT]][source]#

Decorator wraps function and returns a function which returns awaitable object. In case than a function returns a future, the original future will be returned. In case then the function returns a coroutine, the original coroutine will be returned. In case than function returns non-awaitable object, it’s will be wrapped to a new coroutine which just returns this object. It’s useful when you don’t want to check function result before use it in await expression.

aiomisc.utils.bind_socket(*args: Any, address: str, port: int = 0, options: Iterable[Tuple[int, int, int]] = (), reuse_addr: bool = True, reuse_port: bool = True, proto_name: str | None = None) socket[source]#

Bind socket and set setblocking(False) for just created socket. This detects address format and select socket family automatically.

Parameters:
  • args – which will be passed to stdlib’s socket constructor (optional)

  • address – bind address

  • port – bind port

  • options – Tuple of pairs which contain socket option to set and the option value.

  • reuse_addr – set socket.SO_REUSEADDR

  • reuse_port – set socket.SO_REUSEPORT

  • proto_name – protocol name which will be logged after binding

Returns:

socket.socket

aiomisc.utils.cancel_tasks(tasks: Iterable[Future]) Future[source]#

All passed tasks will be cancelled and a new task will be returned.

Parameters:

tasks – tasks which will be cancelled

aiomisc.utils.chunk_list(iterable: Iterable[T], size: int) Iterable[List[T]][source]#

Split list or generator by chunks with fixed maximum size.

aiomisc.utils.create_default_event_loop(pool_size: int | None = None, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>, debug: bool = False) Tuple[AbstractEventLoop, ThreadPoolExecutor][source]#

Creates an event loop and thread pool executor

Parameters:
  • pool_size – thread pool maximal size

  • policy – event loop policy

  • debug – set loop.set_debug(True) if True

aiomisc.utils.fast_uuid1() UUID[source]#

Fast UUID1 like identifier

aiomisc.utils.fast_uuid4() UUID[source]#

Fast UUID4 like identifier

aiomisc.utils.getrandbits(k) x.  Generates an int with k random bits.#
aiomisc.utils.new_event_loop(pool_size: int | None = None, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>) AbstractEventLoop[source]#
aiomisc.utils.pending_futures(futures: Iterable[Future]) Iterator[Future][source]#
aiomisc.utils.select#

alias of SelectAwaitable

aiomisc.utils.set_exception(futures: Iterable[Future], exc: BaseException = CancelledError()) Set[Task][source]#
aiomisc.utils.shield(func: Callable[[...], Coroutine[Any, Any, T]]) Callable[[...], Coroutine[Any, Any, T]][source]#

Simple and useful decorator for wrap the coroutine to asyncio.shield.

>>> @shield
... async def non_cancelable_func():
...     await asyncio.sleep(1)

3.1.21. aiomisc.worker_pool module#

class aiomisc.worker_pool.WorkerPool(workers: int, max_overflow: int = 0, *, initializer: Callable[[...], Any] | None = None, initializer_args: Tuple[Any, ...] = (), initializer_kwargs: Mapping[str, Any] = mappingproxy({}), statistic_name: str | None = None)[source]#
address: str | Tuple[str, int]#
close() None[source]#
async create_task(func: Callable[[...], T], *args: Any, **kwargs: Any) T[source]#
initializer: Callable[[], Any] | None#
initializer_args: Tuple[Any, ...]#
initializer_kwargs: Mapping[str, Any]#
property loop: AbstractEventLoop#
pids: Set[int]#
server: AbstractServer#
async start() None[source]#
tasks: Queue#
worker_ids: Tuple[bytes, ...]#
class aiomisc.worker_pool.WorkerPoolStatistic(name: str | None = None)[source]#
bad_auth: int#
done: int#
error: int#
name: str | None#
processes: int#
queue_size: int#
spawning: int#
submitted: int#
success: int#
sum_time: float#
task_added: int#