2.1. Точка входа (entrypoint)¶
В общем случае точка входа это сущность помогающая создать event loop и закрыть все еще запущенные корутины при выходе.
import asyncio
import aiomisc
async def main():
await asyncio.sleep(1)
with aiomisc.entrypoint() as loop:
loop.run_until_complete(main())
Пример целиком:
import asyncio
import aiomisc
import logging
async def main():
await asyncio.sleep(1)
logging.info("Hello there")
with aiomisc.entrypoint(
pool_size=2,
log_level='info',
log_format='color', # по умолчанию если "rich" не установлен
log_buffer_size=1024, # по умолчанию
log_flush_interval=0.2, # по умолчанию
log_config=True, # по умолчанию
policy=asyncio.DefaultEventLoopPolicy(), # по умолчанию
debug=False, # по умолчанию
catch_signals=(signal.SIGINT, signal.SIGTERM), # по умолчанию
shutdown_timeout=60, # по умолчанию
) as loop:
loop.run_until_complete(main())
Запуск точки входа (entrypoint) из асинхронного кода
import asyncio
import aiomisc
import logging
from aiomisc.service.periodic import PeriodicService
log = logging.getLogger(__name__)
class MyPeriodicService(PeriodicService):
async def callback(self):
log.info('Running periodic callback')
# ...
async def main():
service = MyPeriodicService(interval=1, delay=0) # once per minute
# вернет экземпляр entrypoint потому, что event-loop
# уже запущен и может быть получен через asyncio.get_event_loop()
async with aiomisc.entrypoint(service) as ep:
try:
await asyncio.wait_for(ep.closing(), timeout=1)
except asyncio.TimeoutError:
pass
asyncio.run(main())
2.1.1. Динамический запуск сервисов¶
Иногда бывает недостаточно добавить сервисы в точку входа на старте или нет возможности получить параметры сервиса до старта event-loop. В этом случае возможен запуск сервисов после запуска событийного цикла, эта функция доступна с версии 17
.
import asyncio
import aiomisc
import logging
from aiomisc.service.periodic import PeriodicService
log = logging.getLogger(__name__)
class MyPeriodicService(PeriodicService):
async def callback(self):
log.info('Running periodic callback')
async def add_services():
entrypoint = aiomisc.entrypoint.get_current()
services = [
MyPeriodicService(interval=2, delay=1),
MyPeriodicService(interval=2, delay=0),
]
await entrypoint.start_services(*services)
await asyncio.sleep(10)
await entrypoint.stop_services(*services)
with aiomisc.entrypoint() as loop:
loop.create_task(add_services())
loop.run_forever()
2.1.2. Конфигурация из переменных окружения¶
Модуль поддерживает конфигурацию из переменных окружения:
AIOMISC_LOG_LEVEL
- уровень логирования по умолчаниюAIOMISC_LOG_FORMAT
- формат логирования по умолчаниюAIOMISC_LOG_DATE_FORMAT
- формат дат в логах по умолчаниюAIOMISC_LOG_CONFIG
- следует ли настраивать логированиеAIOMISC_LOG_FLUSH
- интервал сброса буфера логов logsAIOMISC_LOG_BUFFERING
- следует ли включать буфферизацию логированияAIOMISC_LOG_BUFFER_SIZE
- максимальный размер буфера логовAIOMISC_POOL_SIZE
- размер пула потоковAIOMISC_USE_UVLOOP
- следует ли использовать uvloop,0
чтобы отключитьAIOMISC_SHUTDOWN_TIMEOUT
- Если после получения сигнала программа не завершается в течение этого таймаута, происходит принудительный выход.
2.2. Функция run()
¶
aiomisc.run()
- это простой способ создать и разрушить aiomisc.entrypoint
. Это очень похоже на asyncio.run()
но управляет сервисами aiomisc.Service
и принимает прочие аргументы entrypoint.
import asyncio
import aiomisc
async def main():
loop = asyncio.get_event_loop()
now = loop.time()
await asyncio.sleep(0.1)
assert now < loop.time()
aiomisc.run(main())
2.3. Конфигурация журналов¶
entrypoint
принимает аргумент log_format
с определенным набором форматов, в которых журналы будут записываться в stderr.
stream
- стандартный python логгерcolor
- логирование через модульcolorlog
json
- json структура, одна на строчкуsyslog
-logging.handlers.SysLogHandler
из стандартной библиотекиplain
- просто сообщения, без даты или информации об уровне логированияjournald
- доступно только еслиlogging-journald
модуль установлен.rich
/rich_tb
- доступно только если установлен модульrich
.rich_tb
тоже самое что иrich
только с подробными трейсбэками.
Также вы можете настроить уровень логирования параметром log_level
и формат дат в логах параметром log_date_format
entrypoint
вызовет aiomisc.log.basic_config
неявно используя пеараметры log_*=
. В качестве альтернативы, вы можете вызвать aiomisc.log.basic_config
вручную передав ей экземпляр eventloop
.
Однако вы можете настроить логирование раньше, используя aiomisc_log.basic_config
, но вы потеряете буферизацию и запись в буфер отдельном потоке. Эта функция фактически вызывается во время настройки ведения журнала, entrypoint
передает обертку для logging handler, чтобы он записывал в буфер в отдельном потоке.
import logging
from aiomisc_log import basic_config
basic_config(log_format="color")
logging.info("Hello")
Если вы хотите настроить ведение журнала перед запуском entrypoint
, например, после разбора аргументов, это безопасно настроить его дважды (или больше).
import logging
import aiomisc
from aiomisc_log import basic_config
basic_config(log_format="color")
logging.info("Hello from usual python")
async def main():
logging.info("Hello from async python")
with aiomisc.entrypoint(log_format="color") as loop:
loop.run_until_complete(main())
Иногда вы хотите настроить ведение журнала самостоятельно, пример ниже демонстрирует, как это сделать:
import os
import logging
from logging.handlers import RotatingFileHandler
from gzip import GzipFile
import aiomisc
class GzipLogFile(GzipFile):
def write(self, data) -> int:
if isinstance(data, str):
data = data.encode()
return super().write(data)
class RotatingGzipFileHandler(RotatingFileHandler):
""" Really added just for example you have to test it properly """
def shouldRollover(self, record):
if not os.path.isfile(self.baseFilename):
return False
if self.stream is None:
self.stream = self._open()
return 0 < self.maxBytes < os.stat(self.baseFilename).st_size
def _open(self):
return GzipLogFile(filename=self.baseFilename, mode=self.mode)
async def main():
for _ in range(1_000):
logging.info("Hello world")
with aiomisc.entrypoint(log_config=False) as loop:
gzip_handler = RotatingGzipFileHandler(
"app.log.gz",
# Максимум 100 файлов по 10 мегабайт
maxBytes=10 * 2 ** 20, backupCount=100
)
stream_handler = logging.StreamHandler()
formatter = logging.Formatter(
"[%(asctime)s] <%(levelname)s> "
"%(filename)s:%(lineno)d (%(threadName)s): %(message)s"
)
gzip_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
logging.basicConfig(
level=logging.INFO,
# Обертывание всех обработчиков в отдельные потоки не заблокирует
# event-loop даже если gzip занимает много времени, чтобы открыть
# файл.
handlers=map(
aiomisc.log.wrap_logging_handler,
(gzip_handler, stream_handler)
)
)
loop.run_until_complete(main())