2.1. Точка входа (entrypoint)

В общем случае точка входа это сущность помогающая создать event loop и закрыть все еще запущенные корутины при выходе.

import asyncio
import aiomisc

async def main():
    await asyncio.sleep(1)

with aiomisc.entrypoint() as loop:
    loop.run_until_complete(main())

Пример целиком:

import asyncio
import aiomisc
import logging

async def main():
    await asyncio.sleep(1)
    logging.info("Hello there")

with aiomisc.entrypoint(
    pool_size=2,
    log_level='info',
    log_format='color',                            # по умолчанию если "rich" не установлен
    log_buffer_size=1024,                          # по умолчанию
    log_flush_interval=0.2,                        # по умолчанию
    log_config=True,                               # по умолчанию
    policy=asyncio.DefaultEventLoopPolicy(),       # по умолчанию
    debug=False,                                   # по умолчанию
    catch_signals=(signal.SIGINT, signal.SIGTERM), # по умолчанию
    shutdown_timeout=60,                           # по умолчанию
) as loop:
    loop.run_until_complete(main())

Запуск точки входа (entrypoint) из асинхронного кода

import asyncio
import aiomisc
import logging
from aiomisc.service.periodic import PeriodicService

log = logging.getLogger(__name__)

class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')
        # ...

async def main():
    service = MyPeriodicService(interval=1, delay=0)  # once per minute

    # вернет экземпляр entrypoint потому, что event-loop
    # уже запущен и может быть получен через asyncio.get_event_loop()
    async with aiomisc.entrypoint(service) as ep:
        try:
            await asyncio.wait_for(ep.closing(), timeout=1)
        except asyncio.TimeoutError:
            pass


asyncio.run(main())

2.1.1. Динамический запуск сервисов

Иногда бывает недостаточно добавить сервисы в точку входа на старте или нет возможности получить параметры сервиса до старта event-loop. В этом случае возможен запуск сервисов после запуска событийного цикла, эта функция доступна с версии 17.

import asyncio
import aiomisc
import logging

from aiomisc.service.periodic import PeriodicService

log = logging.getLogger(__name__)


class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')


async def add_services():
    entrypoint = aiomisc.entrypoint.get_current()

    services = [
        MyPeriodicService(interval=2, delay=1),
        MyPeriodicService(interval=2, delay=0),
    ]

    await entrypoint.start_services(*services)
    await asyncio.sleep(10)
    await entrypoint.stop_services(*services)


with aiomisc.entrypoint() as loop:
    loop.create_task(add_services())
    loop.run_forever()

2.1.2. Конфигурация из переменных окружения

Модуль поддерживает конфигурацию из переменных окружения:

  • AIOMISC_LOG_LEVEL - уровень логирования по умолчанию

  • AIOMISC_LOG_FORMAT - формат логирования по умолчанию

  • AIOMISC_LOG_DATE_FORMAT - формат дат в логах по умолчанию

  • AIOMISC_LOG_CONFIG - следует ли настраивать логирование

  • AIOMISC_LOG_FLUSH - интервал сброса буфера логов logs

  • AIOMISC_LOG_BUFFERING - следует ли включать буфферизацию логирования

  • AIOMISC_LOG_BUFFER_SIZE - максимальный размер буфера логов

  • AIOMISC_POOL_SIZE - размер пула потоков

  • AIOMISC_USE_UVLOOP - следует ли использовать uvloop, 0 чтобы отключить

  • AIOMISC_SHUTDOWN_TIMEOUT - Если после получения сигнала программа не завершается в течение этого таймаута, происходит принудительный выход.

2.2. Функция run()

aiomisc.run() - это простой способ создать и разрушить aiomisc.entrypoint. Это очень похоже на asyncio.run() но управляет сервисами aiomisc.Service и принимает прочие аргументы entrypoint.

import asyncio
import aiomisc

async def main():
    loop = asyncio.get_event_loop()
    now = loop.time()
    await asyncio.sleep(0.1)
    assert now < loop.time()


aiomisc.run(main())

2.3. Конфигурация журналов

entrypoint принимает аргумент log_format с определенным набором форматов, в которых журналы будут записываться в stderr.

  • stream - стандартный python логгер

  • color - логирование через модуль colorlog

  • json - json структура, одна на строчку

  • syslog - logging.handlers.SysLogHandler из стандартной библиотеки

  • plain - просто сообщения, без даты или информации об уровне логирования

  • journald - доступно только если logging-journald модуль установлен.

  • rich/rich_tb - доступно только если установлен модуль rich. rich_tb тоже самое что и rich только с подробными трейсбэками.

Также вы можете настроить уровень логирования параметром log_level и формат дат в логах параметром log_date_format

entrypoint вызовет aiomisc.log.basic_config неявно используя пеараметры log_*=. В качестве альтернативы, вы можете вызвать aiomisc.log.basic_config вручную передав ей экземпляр eventloop.

Однако вы можете настроить логирование раньше, используя aiomisc_log.basic_config, но вы потеряете буферизацию и запись в буфер отдельном потоке. Эта функция фактически вызывается во время настройки ведения журнала, entrypoint передает обертку для logging handler, чтобы он записывал в буфер в отдельном потоке.

import logging

from aiomisc_log import basic_config


basic_config(log_format="color")
logging.info("Hello")

Если вы хотите настроить ведение журнала перед запуском entrypoint, например, после разбора аргументов, это безопасно настроить его дважды (или больше).

import logging

import aiomisc
from aiomisc_log import basic_config


basic_config(log_format="color")
logging.info("Hello from usual python")


async def main():
    logging.info("Hello from async python")


with aiomisc.entrypoint(log_format="color") as loop:
    loop.run_until_complete(main())

Иногда вы хотите настроить ведение журнала самостоятельно, пример ниже демонстрирует, как это сделать:

import os
import logging
from logging.handlers import RotatingFileHandler
from gzip import GzipFile

import aiomisc


class GzipLogFile(GzipFile):
    def write(self, data) -> int:
        if isinstance(data, str):
            data = data.encode()
        return super().write(data)


class RotatingGzipFileHandler(RotatingFileHandler):
    """ Really added just for example you have to test it properly """

    def shouldRollover(self, record):
        if not os.path.isfile(self.baseFilename):
            return False
        if self.stream is None:
            self.stream = self._open()
        return 0 < self.maxBytes < os.stat(self.baseFilename).st_size

    def _open(self):
        return GzipLogFile(filename=self.baseFilename, mode=self.mode)


async def main():
    for _ in range(1_000):
        logging.info("Hello world")


with aiomisc.entrypoint(log_config=False) as loop:
    gzip_handler = RotatingGzipFileHandler(
        "app.log.gz",
        # Максимум 100 файлов по 10 мегабайт
        maxBytes=10 * 2 ** 20, backupCount=100
    )
    stream_handler = logging.StreamHandler()

    formatter = logging.Formatter(
        "[%(asctime)s] <%(levelname)s> "
        "%(filename)s:%(lineno)d (%(threadName)s): %(message)s"
    )

    gzip_handler.setFormatter(formatter)
    stream_handler.setFormatter(formatter)

    logging.basicConfig(
        level=logging.INFO,
        # Обертывание всех обработчиков в отдельные потоки не заблокирует
        # event-loop даже если gzip занимает много времени, чтобы открыть
        # файл.
        handlers=map(
            aiomisc.log.wrap_logging_handler,
            (gzip_handler, stream_handler)
        )
    )
    loop.run_until_complete(main())