Сервисы

Сервисы - это абстракция, помогающая организовать множество различных задач в одном процессе. Каждый сервис должен реализовывать обязательный метод start() и опционально метод stop ().

Экземпляр сервиса должен быть передан в «точку входа» (entrypoint) и будет запущен после создания event loop.

Примечание

Текущий event-loop будет установлен до вызова метода start(). Event loop будет установлен для этого потока.

Пожалуйста, избегайте использования asyncio.get_event_loop() явно внутри метода start(). Вместо этого используйте атрибут сервиса self.loop:

import asyncio
from threading import Event
from aiomisc import entrypoint, Service

event = Event()

class MyService(Service):
  async def start(self):
      # Send signal to entrypoint for continue running
      self.start_event.set()

      event.set()
      # Start service task
      await asyncio.sleep(3600)


with entrypoint(MyService()) as loop:
    assert event.is_set()

Метод start() создается как отдельная задача, которая может выполняться бесконечно. Но в этом случае необходимо утановить событие вызовом self.start_event.set() для уведомления entrypoint об окончании запуска.

Во время завершения работы сначала будет вызван метод stop(), а после этого все запущенные задачи будут отменены (включая start()).

Этот пакет содержит несколько полезных базовых классов для написания простых сервисов.

Класс TCPServer

TCPServer - это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer), чтобы принимать TCP соединения.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer


log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


async def echo_client(host, port):
    reader, writer = await asyncio.open_connection(host=host, port=port)
    writer.write(b"hello\n")
    assert await reader.readline() == b"hello\n"

    writer.write(b"world\n")
    assert await reader.readline() == b"world\n"

    writer.close()
    await writer.wait_closed()


with entrypoint(
    EchoServer(address='::1', port=8901),
) as loop:
    loop.run_until_complete(echo_client("::1", 8901))

Класс UDPServer

UDPServer - это базовый класс для реализации UDP сервера. Просто реализуйте handle_datagram(data, addr), чтобы принимать UDP соединения.

class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


with entrypoint(UDPPrinter(address='::1', port=3000)) as loop:
    loop.run_forever()

Класс TLSServer

TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer).

class SecureEchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())

service = SecureEchoServer(
    address='::1',
    port=8900,
    ca='ca.pem',
    cert='cert.pem',
    key='key.pem',
    verify=False,
)

with entrypoint(service) as loop:
    loop.run_forever()

Класс PeriodicService

PeriodicService запускает PeriodicCallback как сервис и ожидает завершения обратного вызова при остановке. Вам необходимо использовать PeriodicService в качестве базового класса и переопределить асинхронный метод callback.

Сервисный класс принимает обязательный аргумент interval - интервал запуска в секундах и необязательный аргумент delay - задержку первого выполнения в секундах (по умолчанию 0).

import aiomisc
from aiomisc.service.periodic import PeriodicService


class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')
        # ...

service = MyPeriodicService(interval=3600, delay=0)  # once per hour

with entrypoint(service) as loop:
    loop.run_forever()

Класс CronService

CronService запускает CronCallback в качестве сервиса и ожидает завершения выполнения обратных вызовов при остановке.

Основан на croniter. Вы можете зарегистрировать асинхронный метод с аргументом spec - формат, подобный cron:

Предупреждение

необходимо установить библиотеку croniter:

pip install croniter

или как дополнительную зависимость

pip install aiomisc[cron]
import aiomisc
from aiomisc.service.cron import CronService


async def callback():
    log.info('Running cron callback')
    # ...

service = CronService()
service.register(callback, spec="0 * * * *") # every hour at zero minutes

with entrypoint(service) as loop:
    loop.run_forever()

Вы также можете наследовать от CronService, но помните, что регистрация обратного вызова должна выполняться до запуска

import aiomisc
from aiomisc.service.cron import CronService


class MyCronService(CronService):
    async def callback(self):
        log.info('Running cron callback')
        # ...

    async def start(self):
        self.register(self.callback, spec="0 * * * *")
        await super().start()

service = MyCronService()

with entrypoint(service) as loop:
    loop.run_forever()

Несколько сервисов

Передайте несколько экземпляров сервиса в entrypoint, чтобы запустить их все сразу. После выхода экземпляры сервиса точки входа будут корректно закрыты вызовом метода stop() или через отмену метода start().

import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(PeriodicService):
    async def callabck(self):
        print('Hello from service', self.name)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())


class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


services = (
    LoggingService(name='#1', interval=1),
    EchoServer(address='::1', port=8901),
    UDPPrinter(address='::1', port=3000),
)


with entrypoint(*services) as loop:
    loop.run_forever()

Конфигурация

Метакласс Service принимает все именованные аргументы в __init__ и устанавливает из как атрибуты в self.

import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(Service):
    # required kwargs
    __required__ = frozenset({'name'})

    # default value
    delay: int = 1

    async def start(self):
        self.start_event.set()
        while True:
            # attribute ``name`` from kwargs
            # must be defined when instance initializes
            print('Hello from service', self.name)

            # attribute ``delay`` from kwargs
            await asyncio.sleep(self.delay)

services = (
    LoggingService(name='#1'),
    LoggingService(name='#2', delay=3),
)


with entrypoint(*services) as loop:
    loop.run_forever()

aiohttp сервис

Предупреждение

требуется установленная библиоткеа aiohttp

pip install aiohttp

или как дополнительную зависимость

pip install aiomisc[aiohttp]

Приложение aiohttp может быть запущено как сервис:

import aiohttp.web
import argparse
from aiomisc import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--address", default="::",
                   help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return aiohttp.web.Response(text=text)


class REST(AIOHTTPService):
    async def create_application(self):
        app = aiohttp.web.Application()

        app.add_routes([
            aiohttp.web.get('/', handle),
            aiohttp.web.get('/{name}', handle)
        ])

        return app

arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

Класс AIOHTTPSSLService похож на AIOHTTPService, но создает HTTPS сервер. Вы должны передать требуемые для SSL параметры (см. Класс TLSServer).

asgi сервис

Предупреждение

требуется установленная библиоткеа aiohttp-asgi:

pip install aiohttp-asgi

или как дополнительную зависимость

pip install aiomisc[asgi]

Любое ASGI совместимое приложение может быть запущено как сервис:

import argparse

from fastapi import FastAPI

from aiomisc import entrypoint
from aiomisc.service.asgi import ASGIHTTPService, ASGIApplicationType

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--address", default="::",
                   help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}


class REST(ASGIHTTPService):
    async def create_asgi_app(self) -> ASGIApplicationType:
        return app


arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

Класс ASGIHTTPSSLService похож на ASGIHTTPService, но создает HTTPS сервер. Вы должны передать требуемые для SSL параметры (см. Класс TLSServer).

Трассировщик памяти

Простой и полезный сервис для логирования больших объектов Python, размещенных в памяти.

import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import MemoryTracer


async def main():
    leaking = []

    while True:
        leaking.append(os.urandom(128))
        await asyncio.sleep(0)


with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
    loop.run_until_complete(main())

Пример вывода:

[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
      12 |       12 |   1.9KiB |   1.9KiB | aiomisc/periodic.py:40
      12 |       12 |   1.8KiB |   1.8KiB | aiomisc/entrypoint.py:93
       6 |        6 |   1.1KiB |   1.1KiB | aiomisc/thread_pool.py:71
       2 |        2 |   976.0B |   976.0B | aiomisc/thread_pool.py:44
       5 |        5 |   712.0B |   712.0B | aiomisc/thread_pool.py:52

[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
   43999 |    43999 |   7.1MiB |   7.1MiB | scratches/scratch_8.py:11
      47 |       47 |   4.7KiB |   4.7KiB | env/bin/../lib/python3.7/abc.py:143
      33 |       33 |   2.8KiB |   2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
      44 |       44 |   2.4KiB |   2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
      14 |       14 |   2.4KiB |   2.4KiB | aiomisc/periodic.py:40

Профилировщик

Простой сервис для профилирования. Необязательный аргумент path может быть предоставлен для выгрузки полных данных профилирования, которые позже могут быть использованы, например, snakeviz. Также можно изменить порядок с аргументом order (по умолчанию «cumulative»).

import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import Profiler


async def main():
    for i in range(100):
        time.sleep(0.01)


with entrypoint(Profiler(interval=0.1, top_results=5)) as loop:
    loop.run_until_complete(main())

Пример вывода:

108 function calls in 1.117 seconds

Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   100    1.117    0.011    1.117    0.011 {built-in method time.sleep}
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:89(__init__)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:99(init)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:118(load_stats)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/cProfile.py:50(create_stats)

Raven сервис

Простой сервис для отправки необработанных исключений в сервис sentry.

Простой пример:

import asyncio
import logging
import sys

from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender


async def main():
    while True:
        await asyncio.sleep(1)

        try:
            1 / 0
        except ZeroDivisionError:
            logging.exception("Exception")


raven_sender = RavenSender(
    sentry_dsn=(
        "https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
        "5530251"
    ),
    client_options=dict(
        # Got environment variable SENTRY_NAME by default
        name="example-from-aiomisc",
        # Got environment variable SENTRY_ENVIRONMENT by default
        environment="simple_example",
        # Got environment variable SENTRY_RELEASE by default
        release=__version__,
    )
)


with entrypoint(raven_sender) as loop:
    loop.run_until_complete(main())

Все опции для клиента:

import asyncio
import logging
import sys

from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender


async def main():
    while True:
        await asyncio.sleep(1)

        try:
            1 / 0
        except ZeroDivisionError:
            logging.exception("Exception")


raven_sender = RavenSender(
    sentry_dsn=(
        "https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
        "5530251"
    ),
    client_options=dict(
        # Got environment variable SENTRY_NAME by default
        name="",
        # Got environment variable SENTRY_ENVIRONMENT by default
        environment="full_example",
        # Got environment variable SENTRY_RELEASE by default
        release=__version__,

        # Default options values
        include_paths=set(),
        exclude_paths=set(),
        auto_log_stacks=True,
        capture_locals=True,
        string_max_length=400,
        list_max_length=50,
        site=None,
        include_versions=True,
        processors=(
            'raven.processors.SanitizePasswordsProcessor',
        ),
        sanitize_keys=None,
        context={'sys.argv': getattr(sys, 'argv', [])[:]},
        tags={},
        sample_rate=1,
        ignore_exceptions=(),
    )
)


with entrypoint(raven_sender) as loop:
    loop.run_until_complete(main())

Вы можете найти полное описание параметров в документации Raven.

SDWatchdogService

Просто добавьте этот сервис в entrypoint и он будет отправлять уведомления сторожевому таймеру SystemD.

Вы можете безопасно добавлять это в любом случае, так как если сервис не найдет переменных окружения, которые устанавливает systemd, Сервис просто не запустится, однако выполнение приложения продолжится.

Пример python файла:

import logging
from time import sleep

from aiomisc import entrypoint
from aiomisc.service.sdwatchdog import SDWatchdogService


if __name__ == '__main__':
    with entrypoint(SDWatchdogService()) as loop:
        pass

Пример systemd сервис-файла:

[Service]
# Activating the notification mechanism
Type=notify

# Command which should be started
ExecStart=/home/mosquito/.venv/aiomisc/bin/python /home/mosquito/scratch.py

# The time for which the program must send a watchdog notification
WatchdogSec=5

# Kill the process if it has stopped responding to the watchdog timer
WatchdogSignal=SIGKILL

# The service should be restarted on failure
Restart=on-failure

# Try to kill the process instead of cgroup
KillMode=process

# Trying to stop service properly
KillSignal=SIGINT

# Trying to restart service properly
RestartKillSignal=SIGINT

# Send SIGKILL when timeouts are exceeded
FinalKillSignal=SIGKILL
SendSIGKILL=yes

Класс ProcessService

Базовый класс для запуска функции отдельным системным процессом и завершения при остановке родительского процесса.

from typing import Dict, Any

import aiomisc.service

# Fictional miner implementation
from .my_miner import Miner


class MiningService(aiomisc.service.ProcessService):
    bitcoin: bool = False
    monero: bool = False
    dogiecoin: bool = False

    def in_process(self) -> Any:
        if self.bitcoin:
            miner = Miner(kind="bitcoin")
        elif self.monero:
            miner = Miner(kind="monero")
        elif self.dogiecoin:
            miner = Miner(kind="dogiecoin")
        else:
            # Nothing to do
            return

        miner.do_mining()


services = [
    MiningService(bitcoin=True),
    MiningService(monero=True),
    MiningService(dogiecoin=True),
]

if __name__ == '__main__':
    with aiomisc.entrypoint(*services) as loop:
        loop.run_forever()

Класс RespawningProcessService

Базовый класс для запуска функции отдельным системным процессом и завершения при остановке родительского процесса. Это очень похоже на ProcessService с одним отличием - если дочерний процесс неожиданно завершится, то он будет перезапущен.

import logging
from typing import Any

import aiomisc

from time import sleep


class SuicideService(aiomisc.service.RespawningProcessService):
    def in_process(self) -> Any:
        sleep(10)
        logging.warning("Goodbye mad world")
        exit(42)


if __name__ == '__main__':
    with aiomisc.entrypoint(SuicideService()) as loop:
        loop.run_forever()