1. Учебник

aiomisc — это библиотека Python, которая предоставляет набор утилит для создание асинхронных сервисов. Это позволяет вам разделить вашу программу на небольшие независимые службы, которые могут работать одновременно, улучшая общую производительность и масштабируемость вашего приложения.

Основным подходом в этой библиотеке является разделение вашей программы на независимые сервисы, которые могут работать конкурентно в асинхронном режиме. Библиотека также предоставляет набор готовых к использованию сервисов с заранее написанной логикой запуска и остановки.

Подавляющее большинство функций и классов написаны таким образом, что их можно использовать в программе, которая изначально не была разработана в соответствии с принципами, изложенными в этом руководстве. Так что если вы не планируете слишком сильно переписывать свой код, а хотите использовать только несколько полезных для вас функций или классов, то все должно работать.

В целом, aiomisc — это мощный инструмент для разработчиков, стремящихся создавать эффективные и масштабируемые асинхронные сервисы на Python.

1.1. Сервисы

Если вы хотите запустить tcp или веб-сервер, вам придется написать что-то вроде этого:

import asyncio

async def example_async_func():
    # Делаем что-то асинхронное и очень важное
    await init_db()
    await init_cache()
    await start_http_server()
    await start_metrics_server()

loop = asyncio.get_event_loop()
loop.run_until_complete(example_async_func())
# Продолжаем выполнение асинхронных задач
loop.run_forever()

Для того, чтобы запустить или остановить асинхронные программы, обычно используется функция asyncio.run(example_async_func()), которая доступна начиная с Python 3.7. Функция принимает экземпляр сопрограммы и отменяет через cancel() всем задачи перед тем как вернуть результат. Чтобы продолжить выполнение кода вечно, вы можете применить следующий трюк:

import asyncio

async def example_async_func():
    # Делаем что-то асинхронное и очень важное
    await init_db()
    await init_cache()
    await start_http_server()
    await start_metrics_server()

    # Создаем future которая никогда не будет завершена
    # используется вместо loop.run_forever()
    await asyncio.Future()

asyncio.run(example_async_func())

Если пользователь нажимает Ctrl+C, программа просто завершается, но если вы хотите явно освободить некоторые ресурсы, например, закрыть соединения с базой данных или откатить незавершенные транзакции, то вам нужно сделать что-то вроде этого:

import asyncio

async def example_async_func():
    try:
        # Делаем что-то асинхронное и очень важное
        await init_db()
        await init_cache()
        await start_http_server()
        await start_metrics_server()

        # Создаем future которая никогда не будет завершена
        # используется вместо loop.run_forever()
        await asyncio.Future()
    except asyncio.CancelledError:
        # Зайдем в этот блок будет когда получим SIGTERM
        pass
    finally:
        # Зайдем сюда перед выходом
        ...

asyncio.run(example_async_func())

Это хорошее решение, потому что оно реализовано без каких-либо сторонних библиотек. Но когда ваша программа начнет расти, вы, вероятно, захотите оптимизировать время запуска простым способом, а именно выполнить всю инициализацию конкурентно. На первый взгляд кажется, что этот код решит проблему:

import asyncio

async def example_async_func():
    try:
        # Делаем что-то асинхронное и очень важное
        await asyncio.gather(
            init_db(),
            init_cache(),
            start_http_server(),
            start_metrics_server(),
        )

        # Создаем future которая никогда не будет завершена
        # используется вместо loop.run_forever()
        await asyncio.Future()
    except asyncio.CancelledError:
        # Зайдем в этот блок будет когда получим SIGTERM
        pass
    finally:
        # Зайдем сюда перед выходом
        ...

asyncio.run(example_async_func())

Но если вдруг какая-то часть инициализации пойдет не по плану, то вам каким-то образом придется выяснить, что именно пошло не так. Поэтому при конкурентном выполнении код уже не будет таким простым, как в этом примере.

И для того, чтобы как-то упорядочить код, вы должны сделать отдельную функцию, которая будет содержать блок try/except/finally и содержать обработку ошибок.

import asyncio

async def init_db():
    try:
        # инициализируем соединение
    finally:
        # закрываем соединение
        ...

async def example_async_func():
    try:
        # Делаем что-то асинхронное и очень важное
        await asyncio.gather(
            init_db(),
            init_cache(),
            start_http_server(),
            start_metrics_server(),
        )

        # Создаем future которая никогда не будет завершена
        # используется вместо loop.run_forever()
        await asyncio.Future()
    except asyncio.CancelledError:
        # Зайдем в этот блок будет когда получим SIGTERM
        # TODO: нужно как-то корректно все завершить
        pass
    finally:
        # Зайдем сюда перед выходом
        ...

asyncio.run(example_async_func())

И теперь, если пользователь нажимает Ctrl+C, вам нужно снова описать логику завершения работы, но уже в блоке except.

Для того, чтобы описать логику запуска и остановки в одном месте, а также тестирования одним-единственным способом, и существует абстракция Service.

Сервис представляет из себя абстрактный базовый класс, в котором нужно реализовать метод start() и не обязательно метод stop()

Сервис может работать в двух режимах. Первый это когда метод старт выполняется вечно, тогда не нужно реализовывать стоп, но нужно сообщить что инициализация успешно закончена с помощью self.start_event.set().

import asyncio
import aiomisc


class InfinityService(aiomisc.Service):
    async def start(self):
        # Сервис готов работать
        self.start_event.set()

        while True:
            # Делаем что-то полезное
            await asyncio.sleep(1)

В этом случае остановка сервиса будет заключаться в завершении сопрограммы которую породила start()

Второй способ это явное описание способа запуска и остановки реализовав методы start() и stop()

import asyncio
import aiomisc
from typing import Any


class OrdinaryService(aiomisc.Service):
    async def start(self):
        # Делаем что-то полезное
        ...

    async def stop(self, exception: Exception = None) -> Any:
        # Делаем что-то полезное
        ...

В этом случае запуск и остановка сервиса будут выполнены однократно.

1.2. Конфигурация сервисов

Так как Service это метакласс, он может обрабатывать специальные аттрибуты классов наследуемых от него на этапе объявления класса.

Вот простой императивный пример как инициализация сервиса может быть расширена через наследование.

from typing import Any
import aiomisc

class HelloService(aiomisc.Service):
    def __init__(self, name: str = "мир", **kwargs: Any):
        super().__init__(**kwargs)
        self.name = name

    async def start(self) -> Any:
        print(f"Привет {self.name}")

with aiomisc.entrypoint(
    HelloService(),
    HelloService(name="Гвидо")
) as loop:
    pass

# python hello.py
# <<< Привет мир
# <<< Привет Гвидо

На самом деле, можно было ничего и не делать, так как метакласс установит все переданные именованные параметры в self по умолчанию.

import aiomisc

class HelloService(aiomisc.Service):
    name: str = "мир"

    async def start(self):
        print(f"Привет {self.name}")

with aiomisc.entrypoint(
    HelloService(),
    HelloService(name="Гвидо")
) as loop:
    pass

# python hello.py
# <<< Привет мир
# <<< Привет Гвидо

Если-же объявлен специальный аттрибут __required__, сервис будет требовать чтобы он был передан явно при инициализации как именованный параметр.

import aiomisc

class HelloService(aiomisc.Service):
    __required__ = ("name", "title")

    name: str
    title: str

    async def start(self):
        await asyncio.sleep(0.1)
        print(f"Привет {self.title} {self.name}")

with aiomisc.entrypoint(
    HelloService(name="Гвидо", title="мистер")
) as loop:
    pass

Также очень полезный специальным атрибут класса это __async_required__. В общем, это полезно для написания базовых классов. Он содержит кортеж имен методов, которые должны быть явно объявлены асинхронными (через async def).

import aiomisc

class HelloService(aiomisc.Service):
    __required__ = ("name", "title")
    __async_required__ = ("greeting",)

    name: str
    title: str

    async def greeting(self) -> str:
        await asyncio.sleep(0.1)
        return f"Привет {self.title} {self.name}"

    async def start(self):
        print(await self.greeting())

class HelloEmojiService(HelloService):
    async def greeting(self) -> str:
        await asyncio.sleep(0.1)
        return f"🙋 {self.title} {self.name}"

with aiomisc.entrypoint(
    HelloService(name="Гвидо", title="мистер"),
    HelloEmojiService(name="👨", title="🎩")
) as loop:
    pass

# Привет мистер Гвидо
# 🙋 🎩 👨

Если наследник объявит эти методы по-другому, будет ошибка на этапе объявления класса.

class BadHello(HelloService):
    def greeting(self) -> str:
        return f"{self.title} {self.name}"

#Traceback (most recent call last):
#...
#TypeError: ('Following methods must be coroutine functions', ('BadHello.greeting',))

1.3. dependency injection

В некоторых случаях перед запуском службы необходимо выполнить некоторый асинхронный код, например, чтобы передать соединение с базой данных экземпляру сервиса. Или если вы хотите использовать один экземпляр какой-то сущности для нескольких сервисов.

Для таких сложных конфигураций существует плагин aiomisc-dependency, который распространяется как независимый отдельный пакет.

Посмотрите на примеры в документации, aiomisc-dependency прозрачно интегрируется с entrypoint.

1.4. entrypoint

Итак сервисы описаны, что дальше? asyncio.run не умеет с ними работать, вызывать их вручную не стало проще, что тут можно предложить?

Наверное самый магический, сложный, и в то-же время достаточно хорошо протестированный код в библиотеке - это entrypoint. Изначально идеей entrypoint было избавление от рутины: настройка логов, настройка пула потоков, ну и запуск и корректная остановка сервисов.

Давайте посмотрим на пример:

import asyncio
import aiomisc

...

with aiomisc.entrypoint(
    OrdinaryService(),
    InfinityService()
) as loop:
    loop.run_forever()

В этом примере мы запускаем два сервиса, описанных выше, и продолжаем выполнение до тех пор пока пользователь его не прервёт. Далее, благодаря контекстному менеджеру, мы корректно завершаем все экземпляры сервисов.

Примечание

Entrypoint вызывает все методы start() во всех сервисах конкурентно, и если хотя-бы один из них упадет, все остальные сервисы будут остановлены.

Я упоминал, что я хотел убрать много рутины, давайте посмотрим на тот же пример, только передадим явно все параметры по умолчанию в entrypoint.

import asyncio
import aiomisc


...

with aiomisc.entrypoint(
    OrdinaryService(),
    InfinityService(),
    pool_size=4,
    log_level="info",
    log_format="color",
    log_buffering=True,
    log_buffer_size=1024,
    log_flush_interval=0.2,
    log_config=True,
    policy=asyncio.DefaultEventLoopPolicy(),
    debug=False
) as loop:
    loop.run_forever()

Давайте не будем останавливаться на том, что делает каждый параметр. Но в целом, entrypoint создала цикл событий, пул из четырех потоков, настроила его для текущего цикла событий, настроила логгер с «цветным» буферизованным выводом и запустила два сервиса.

Также вы можете запустить entrypoint без сервисов, просто сконфигурировать логирование и прочее:

import asyncio
import logging
import aiomisc


async def sleep_and_exit():
    logging.info("Started")
    await asyncio.sleep(1)
    logging.info("Exiting")


with aiomisc.entrypoint(log_level="info") as loop:
    loop.run_until_complete(sleep_and_exit())

Еще стоит обратить внимание на aiomisc.run который похож по своему назначению на asyncio.run при этом поддерживает запуск и остановку сервисов и прочее.

import asyncio
import logging
import aiomisc


async def sleep_and_exit():
    logging.info("Started")
    await asyncio.sleep(1)
    logging.info("Exiting")


aiomisc.run(
    # первый аргумент
    # это основная короутина
    sleep_and_exit(),
    # Другие позиционные аргументы
    # это экземпляры сервисов
    OrdinaryService(),
    InfinityService(),
    # другие именованные аргументы
    # будут просто переданы в entrypoint
    log_level="info"
)

Примечание

Как я и упоминал ранее библиотека содержит большое количество готовых абстрактных сервисов, которые вы можете использовать в своем проекте, просто реализовав несколько методов.

Полный список сервисов и примеры их использования можно найти на странице Сервисы.

1.5. Выполнение кода в пуле потоков или процессов

Как объясняется в разделе working with threads в официальной документации по Python, eventloop в asyncio запускает пул потоков.

Этот пул нужен для того, чтобы запустить, например, разрешение имен и не блокировать eventloop, пока работает низкоуровневый вызов gethostbyname.

Размер этого пула потоков должен быть настроен при запуске приложения, иначе вы можете столкнуться со всевозможными проблемами, когда этот пул слишком велик или слишком мал.

По умолчанию entrypoint создает пул потоков с размером, равным количеству ядер процессора (минимум — 4, и максимум — 32). Конечно, вы можете указать столько, сколько вам нужно.

1.5.1. Декоратор @aiomisc.threaded

В разделе working with threads официальной документации по python даются следующие рекомендации по вызову блокирующих функций в потоках:

import asyncio

def blocking_io():
    # Файловые операции (такие как логирование) могут заблокировать event loop.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_io)

asyncio.run(main())

Эта библиотека содержит очень простой способ сделать тоже самое:

import aiomisc

@aiomisc.threaded
def blocking_io():
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

async def main():
    result = await blocking_io()

aiomisc.run(main())

Как видно из этого примера, достаточно обернуть функцию декоратором aiomisc.threaded, и она начнет возвращать awaitable объект, но код внутри функции будет отправлен в пул потоков по умолчанию.

1.5.2. Декоратор @aiomisc.threaded_separate

Если блокирующая функция работает долго, а то и бесконечно долго, иными словами, если стоимость создания потока незначительна по сравнению с сколько функция работает, то можно попробовать использовать декоратор aiomisc.threaded_separate.

Декоратор запускает новый поток, не связанный с каким-либо пулом потоков. Поток завершится после выхода из функции.

import hashlib
import aiomisc

@aiomisc.threaded_separate
def another_one_useless_coin_miner():
    with open('/dev/urandom', 'rb') as f:
        hasher = hashlib.sha256()
        while True:
            hasher.update(f.read(1024))
            if hasher.hexdigest().startswith("0000"):
                return hasher.hexdigest()

async def main():
    print(
        "Хэш получился вот такой:",
        await another_one_useless_coin_miner()
    )

aiomisc.run(main())

Примечание

Такой подход позволяет не занимать потоки в пуле надолго, но при этом никак не ограничивает количество создаваемых потоков.

Больше примеров можно найти на странице Работа с потоками

1.5.3. Декоратор @aiomisc.threaded_iterable

Если генератор нужно выполнить в потоке, возникают проблемы с синхронизацией потока и eventloop. Эта библиотека предоставляет пользовательский декоратор, предназначенный для превращения синхронного генератора в асинхронный.

Это очень полезно, если, например, драйвер очереди или базы данных синхронный, но вы хотите эффективно использовать его в асинхронном коде.

import aiomisc

@aiomisc.threaded_iterable(max_size=8)
def urandom_reader():
    with open('/dev/urandom', "rb") as fp:
        while True:
            yield fp.read(8)

async def main():
    counter = 0
    async for chunk in urandom_reader():
        print(chunk)
        counter += 1
        if counter > 16:
            break

aiomisc.run(main())

Под капотом этот декоратор возвращает специальный объект с очередью, а интерфейс асинхронного итератора обеспечивает доступ к этой очереди.

Всегда следует указывать параметр max_size, который ограничивает размер этой очереди и предотвращает отправку кода, который работает в потоке, слишком большого количества элементов в асинхронный код, в случае асинхронной итерации в случае, если асинхронный итератор забирает элементы из этой очереди реже чем они поступают.

1.5.4. Заключение

На этом нам нужно закончить этот учебник, надеюсь тут все было понятно, и вы почерпнули для себя много полезного. Полное описание остальных сервисов представлено в разделе Модули, или в исходном коде. Авторы постарались сделать исходный код максимально понятным и простым, поэтому не стесняйтесь исследовать его.