2.1. entrypoint#

In the generic case, the entrypoint helper creates an event loop and cancels already running coroutines on exit.

import asyncio
import aiomisc

async def main():
    await asyncio.sleep(1)

with aiomisc.entrypoint() as loop:
    loop.run_until_complete(main())

Complete example:

import asyncio
import aiomisc
import logging
import signal

async def main():
    await asyncio.sleep(1)
    logging.info("Hello there")

with aiomisc.entrypoint(
    pool_size=2,
    log_level='info',
    log_format='color',                            # default when "rich" absent
    log_buffer_size=1024,                          # default
    log_flush_interval=0.2,                        # default
    log_config=True,                               # default
    policy=asyncio.DefaultEventLoopPolicy(),       # default
    debug=False,                                   # default
    catch_signals=(signal.SIGINT, signal.SIGTERM), # default
    shutdown_timeout=60,                           # default
) as loop:
    loop.run_until_complete(main())

Running entrypoint from async code

import asyncio
import aiomisc
import logging
from aiomisc.service.periodic import PeriodicService

log = logging.getLogger(__name__)

class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')
        # ...

async def main():
    service = MyPeriodicService(interval=1, delay=0)  # once per minute

    # returns an entrypoint instance because event-loop
    # already running and might be get via asyncio.get_event_loop()
    async with aiomisc.entrypoint(service) as ep:
        try:
            await asyncio.wait_for(ep.closing(), timeout=1)
        except asyncio.TimeoutError:
            pass


asyncio.run(main())

2.1.1. Dynamic running of services#

Sometimes it is not enough to add services to the entrypoint at the start, or it is not possible to get the service parameters before the start of the event-loop. In this case it is possible to start services after the event-loop has started, this feature available from version 17.

import asyncio
import aiomisc
import logging

from aiomisc.service.periodic import PeriodicService

log = logging.getLogger(__name__)


class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')


async def add_services():
    entrypoint = aiomisc.entrypoint.get_current()

    services = [
        MyPeriodicService(interval=2, delay=1),
        MyPeriodicService(interval=2, delay=0),
    ]

    await entrypoint.start_services(*services)
    await asyncio.sleep(10)
    await entrypoint.stop_services(*services)


with aiomisc.entrypoint() as loop:
    loop.create_task(add_services())
    loop.run_forever()

2.1.2. Configuration from environment#

Module support configuration from environment variables:

  • AIOMISC_LOG_LEVEL - default logging level

  • AIOMISC_LOG_FORMAT - default log format

  • AIOMISC_LOG_DATE_FORMAT - default logging date format

  • AIOMISC_LOG_CONFIG - should logging be configured

  • AIOMISC_LOG_FLUSH - interval between logs flushing from buffer

  • AIOMISC_LOG_BUFFERING - should logging be buffered

  • AIOMISC_LOG_BUFFER_SIZE - maximum log buffer size

  • AIOMISC_POOL_SIZE - thread pool size

  • AIOMISC_USE_UVLOOP - should use uvloop when it available, 0 to disable

  • AIOMISC_SHUTDOWN_TIMEOUT - If, after receiving the signal, the program does not terminate within this timeout, a force-exit occurs.

2.2. run() shortcut#

aiomisc.run() - it’s the short way to create and destroy aiomisc.entrypoint. It’s very similar to asyncio.run() but handle Service’s and other entrypoint’s kwargs.

import asyncio
import aiomisc

async def main():
    loop = asyncio.get_event_loop()
    now = loop.time()
    await asyncio.sleep(0.1)
    assert now < loop.time()


aiomisc.run(main())

2.3. Logging configuration#

entrypoint accepts log_format argument with a specific set of formats, in which logs will be written to stderr:

  • stream - Python’s default logging handler

  • color - logging with colorlog module

  • json - json structure per each line

  • syslog - logging using stdlib logging.handlers.SysLogHandler

  • plain - just log messages, without date or level info

  • journald - available only when logging-journald module has been installed.

  • rich/rich_tb - available only when rich module has been installed. rich_tb it’s the same as rich but with fully expanded tracebacks.

Additionally, you can specify log level using log_level argument and date format using log_date_format parameters.

An entrypoint will call aiomisc.log.basic_config function implicitly using passed log_* parameters. Alternatively you can call aiomisc.log.basic_config function manually passing it already created eventloop.

However, you can configure logging earlier using aiomisc_log.basic_config, but you will lose log buffering and flushing in a separate thread. This function is what is actually called during the logging configuration, the entrypoint passes a wrapper for the handler there to flush it into the separate thread.

import logging

from aiomisc_log import basic_config


basic_config(log_format="color")
logging.info("Hello")

If you want to configure logging before the entrypoint is started, for example after the arguments parsing, it is safe to configure it twice (or more).

import logging

import aiomisc
from aiomisc_log import basic_config


basic_config(log_format="color")
logging.info("Hello from usual python")


async def main():
    logging.info("Hello from async python")


with aiomisc.entrypoint(log_format="color") as loop:
    loop.run_until_complete(main())

Sometimes you want to configure logging manually, the following example demonstrates how to do this:

import os
import logging
from logging.handlers import RotatingFileHandler
from gzip import GzipFile

import aiomisc


class GzipLogFile(GzipFile):
    def write(self, data) -> int:
        if isinstance(data, str):
            data = data.encode()
        return super().write(data)


class RotatingGzipFileHandler(RotatingFileHandler):
    """ Really added just for example you have to test it properly """

    def shouldRollover(self, record):
        if not os.path.isfile(self.baseFilename):
            return False
        if self.stream is None:
            self.stream = self._open()
        return 0 < self.maxBytes < os.stat(self.baseFilename).st_size

    def _open(self):
        return GzipLogFile(filename=self.baseFilename, mode=self.mode)


async def main():
    for _ in range(1_000):
        logging.info("Hello world")


with aiomisc.entrypoint(log_config=False) as loop:
    gzip_handler = RotatingGzipFileHandler(
        "app.log.gz",
        # Maximum 100 files by 10 megabytes
        maxBytes=10 * 2 ** 20, backupCount=100
    )
    stream_handler = logging.StreamHandler()

    formatter = logging.Formatter(
        "[%(asctime)s] <%(levelname)s> "
        "%(filename)s:%(lineno)d (%(threadName)s): %(message)s"
    )

    gzip_handler.setFormatter(formatter)
    stream_handler.setFormatter(formatter)

    logging.basicConfig(
        level=logging.INFO,
        # Wrapping all handlers in separate streams will not block the
        # event-loop even if gzip takes a long time to open the
        # file.
        handlers=map(
            aiomisc.log.wrap_logging_handler,
            (gzip_handler, stream_handler)
        )
    )
    loop.run_until_complete(main())