2.16. Утилиты

2.16.1. select

Иногда требуется дождаться выполнения хотя-бы одной задачи из многих. select ожидает выполнения одной из переданных сопрограмм (или объектов с реализованным методом __await__) И возвращает список результатов.

import asyncio
import aiomisc


async def main():
    loop = asyncio.get_event_loop()
    event = asyncio.Event()
    future = asyncio.Future()

    loop.call_soon(event.set)

    await aiomisc.select(event.wait(), future)
    print(event.is_set())       # True

    event = asyncio.Event()
    future = asyncio.Future()

    loop.call_soon(future.set_result, True)

    results = await aiomisc.select(future, event.wait())
    future_result, event_result = results

    print(results.result())             # True
    print(results.result_idx)           # 0
    print(event_result, future_result)  # None, True


with aiomisc.entrypoint() as loop:
    loop.run_until_complete(main())

Предупреждение

В случае если вы не желаете отменять запущенные задачии передайте аргумент cancel=False. Но в этом случае вам придется разобраться с завершением или отменой самостоятельон иначе будет предупреждение от интерпретатора.

2.16.2. cancel_tasks

Все переданные задачи будут отменены, при это функция возвращает asyncio.Task:

import asyncio
from aiomisc import cancel_tasks


async def main():
    done, pending = await asyncio.wait([
        asyncio.sleep(i) for i in range(10)
    ], timeout=5)

    print("Done", len(done), "tasks")
    print("Pending", len(pending), "tasks")
    await cancel_tasks(pending)


asyncio.run(main())

2.16.3. awaitable

Оборачивает функции таким образом что они всегда возвращают сопрограмму. Если функция возвращает объект asyncio.Future, будет возвращен оригинальный объект. Если функция итак возвращает сопрограмму, или объект с реализованным методом __await__ будет возвращен оригинальный объект.В противном случае возвращаемый объект будет тобернут в сопрограмму, которая вернет этот объект.Это полезно если не хочется проверять возврат из функции перед тем как использовать ее в await выражении.

import asyncio
import aiomisc


async def do_callback(func, *args):
    awaitable_func = aiomisc.awaitable(func)

    return await awaitable_func(*args)


print(asyncio.run(do_callback(asyncio.sleep, 2)))
print(asyncio.run(do_callback(lambda: 45)))

2.16.4. bind_socket

Создает сокет и устанавливает для него необходимые для работы с asyncio флаги (вроде setblocking(False)). Так-же определяет семейство адресов (IPv6/IPv4) из формата аргумента address автоматически.

from aiomisc import bind_socket

# IPv4 socket
sock = bind_socket(address="127.0.0.1", port=1234)

# IPv6 socket (on Linux IPv4 socket will be bind too)
sock = bind_socket(address="::1", port=1234)

2.16.5. RecurringCallback

Запускает сопрограммы переодически с определяемой пользователем стратегией.

from typing import Union
from aiomisc import new_event_loop, RecurringCallback


async def callback():
    print("Hello")


FIRST_CALL = False


async def strategy(_: RecurringCallback) -> Union[int, float]:
    global FIRST_CALL
    if not FIRST_CALL:
        FIRST_CALL = True
        # Ждем 5 секунд если только-что запустились
        return 5

    # Ждем 10 секунд если это не первый запуск
    return 10


if __name__ == '__main__':
    loop = new_event_loop()

    periodic = RecurringCallback(callback)

    task = periodic.start(strategy)
    loop.run_forever()

Основная цель этого класса - предоставить возможность указывать стратегией асинхронную функцию, которая может быть так как вам нужно.

Кроме того, с помощью специальных исключений вы можете управлять поведением запущенного RecurringCallback.

from aiomisc import (
    new_event_loop, RecurringCallback, StrategySkip, StrategyStop
)


async def strategy(_: RecurringCallback) -> Union[int, float]:
    ...

    # Пропускаем эту попытку и ждем 10 секунд
    raise StrategySkip(10)

    ...

    # Stop execution
    raise StrategyStop()

если функция-стратегия возвращает неверное значение (не число) или не вызывает специальных исключений, повторяющееся выполнение завершается.

2.16.6. PeriodicCallback

Запускает сопрограммы переодически с заданным периодом времени, и необязательной задержкой при первом запуске. Использует RecurringCallback под капотом.

import asyncio
import time
from aiomisc import new_event_loop, PeriodicCallback


async def periodic_function():
    print("Hello")


if __name__ == '__main__':
    loop = new_event_loop()

    periodic = PeriodicCallback(periodic_function)

    # Ждем 10 секунд и вызываем это каждую секунду после этого
    periodic.start(1, delay=10)

    loop.run_forever()

2.16.7. CronCallback

Предупреждение

Придется установить пакет croniter чтобы пользоваться этим:

pip install croniter

Или указать это как extras при установке aiomisc:

pip install aiomisc[cron]

Запускает сопрограммы переодически, как-будто с помощью cron.Использует RecurringCallback под капотом.

import asyncio
import time
from aiomisc import new_event_loop, CronCallback


async def cron_function():
    print("Hello")


if __name__ == '__main__':
    loop = new_event_loop()

    periodic = CronCallback(cron_function)

    # Будем запускать это каждую секунду
    periodic.start(spec="* * * * * *")

    loop.run_forever()