Source code for aiomisc.timeout
import asyncio
from functools import wraps
from typing import Any, Awaitable, Callable, TypeVar, Union
T = TypeVar("T")
Number = Union[int, float]
FuncType = Callable[..., Awaitable[T]]
[docs]
def timeout(value: Number) -> Callable[[FuncType], FuncType]:
def decorator(
func: FuncType,
) -> FuncType:
if not asyncio.iscoroutinefunction(func):
raise TypeError("Function is not a coroutine function")
@wraps(func)
async def wrap(*args: Any, **kwargs: Any) -> T:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=value,
)
return wrap
return decorator