2.4. Сервисы¶
Сервисы
- это абстракция, помогающая организовать множество различных задач в одном процессе. Каждый сервис должен реализовывать обязательный метод start()
и опционально метод stop ().
Экземпляр сервиса должен быть передан в «точку входа» (entrypoint) и будет запущен после создания event loop.
Примечание
Текущий event-loop будет установлен до вызова метода start(). Event loop будет установлен для этого потока.
Пожалуйста, избегайте использования asyncio.get_event_loop()
явно внутри метода start()
. Вместо этого используйте атрибут сервиса self.loop
:
import asyncio
from threading import Event
from aiomisc import entrypoint, Service
event = Event()
class MyService(Service):
async def start(self):
# Отправляем сигнал в entrypoint что можно продолжать
self.start_event.set()
event.set()
# Запуск чего-то полезного
await asyncio.sleep(3600)
with entrypoint(MyService()) as loop:
assert event.is_set()
Метод start()
создается как отдельная задача, которая может выполняться бесконечно. Но в этом случае необходимо утановить событие вызовом self.start_event.set()
для уведомления entrypoint об окончании запуска.
Во время завершения работы сначала будет вызван метод stop()
, а после этого все запущенные задачи будут отменены (включая start()
).
Этот пакет содержит несколько полезных базовых классов для написания простых сервисов.
2.4.1. Класс TCPServer
¶
TCPServer
- это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer)
, чтобы принимать TCP соединения.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer
log = logging.getLogger(__name__)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
async def echo_client(host, port):
reader, writer = await asyncio.open_connection(host=host, port=port)
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(address='localhost', port=8901),
) as loop:
loop.run_until_complete(echo_client("localhost", 8901))
2.4.2. Класс UDPServer
¶
UDPServer
- это базовый класс для реализации UDP сервера. Просто реализуйте handle_datagram(data, addr)
, чтобы принимать UDP соединения.
class UDPPrinter(UDPServer):
async def handle_datagram(self, data: bytes, addr):
print(addr, '->', data)
with entrypoint(UDPPrinter(address='localhost', port=3000)) as loop:
loop.run_forever()
2.4.3. Класс TLSServer
¶
TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer)
.
class SecureEchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
service = SecureEchoServer(
address='localhost',
port=8900,
ca='ca.pem',
cert='cert.pem',
key='key.pem',
verify=False,
)
with entrypoint(service) as loop:
loop.run_forever()
2.4.4. TCPClient
¶
TCPServer
- это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer)
, чтобы принимать TCP соединения.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient
log = logging.getLogger(__name__)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(TCPClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(address='localhost', port=8901),
EchoClient(address='localhost', port=8901),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.5. TLSClient
¶
TCPServer
- это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer)
, чтобы принимать TCP соединения.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient
log = logging.getLogger(__name__)
class EchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(TLSClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(
address='localhost', port=8901,
ca='ca.pem',
cert='server.pem',
key='server.key',
),
EchoClient(
address='localhost', port=8901,
ca='ca.pem',
cert='client.pem',
key='client.key',
),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.6. RobustTCPClient
¶
TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer)
.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient
log = logging.getLogger(__name__)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(RobustTCPClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(address='localhost', port=8901),
EchoClient(address='localhost', port=8901),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.7. RobustTLSClient
¶
TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer)
.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient
log = logging.getLogger(__name__)
class EchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(RobustTLSClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(
address='localhost', port=8901,
ca='ca.pem',
cert='server.pem',
key='server.key',
),
EchoClient(
address='localhost', port=8901,
ca='ca.pem',
cert='client.pem',
key='client.key',
),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.8. Класс PeriodicService
¶
PeriodicService запускает PeriodicCallback как сервис и ожидает завершения обратного вызова при остановке. Вам необходимо использовать PeriodicService в качестве базового класса и переопределить асинхронный метод callback.
Сервисный класс принимает обязательный аргумент interval
- интервал запуска в секундах и необязательный аргумент delay
- задержку первого выполнения в секундах (по умолчанию 0).
import aiomisc
from aiomisc.service.periodic import PeriodicService
class MyPeriodicService(PeriodicService):
async def callback(self):
log.info('Running periodic callback')
# ...
service = MyPeriodicService(interval=3600, delay=0) # раз в час
with entrypoint(service) as loop:
loop.run_forever()
2.4.9. Класс CronService
¶
CronService
запускает CronCallback
в качестве сервиса и ожидает завершения выполнения обратных вызовов при остановке.
Основан на croniter. Вы можете зарегистрировать асинхронный метод с аргументом spec
- формат, подобный cron:
Предупреждение
необходимо установить библиотеку croniter:
pip install croniter
или как дополнительную зависимость
pip install aiomisc[cron]
import aiomisc
from aiomisc.service.cron import CronService
async def callback():
log.info('Running cron callback')
# ...
service = CronService()
service.register(callback, spec="0 * * * *") # каждый час в 0 минут minutes
with entrypoint(service) as loop:
loop.run_forever()
Вы также можете наследовать от CronService
, но помните, что регистрация обратного вызова должна выполняться до запуска
import aiomisc
from aiomisc.service.cron import CronService
class MyCronService(CronService):
async def callback(self):
log.info('Running cron callback')
# ...
async def start(self):
self.register(self.callback, spec="0 * * * *")
await super().start()
service = MyCronService()
with entrypoint(service) as loop:
loop.run_forever()
2.4.10. Несколько сервисов¶
Передайте несколько экземпляров сервиса в entrypoint
, чтобы запустить их все сразу. После выхода экземпляры сервиса точки входа будут корректно закрыты вызовом метода stop()
или через отмену метода start()
.
import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer
class LoggingService(PeriodicService):
async def callabck(self):
print('Hello from service', self.name)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
class UDPPrinter(UDPServer):
async def handle_datagram(self, data: bytes, addr):
print(addr, '->', data)
services = (
LoggingService(name='#1', interval=1),
EchoServer(address='localhost', port=8901),
UDPPrinter(address='localhost', port=3000),
)
with entrypoint(*services) as loop:
loop.run_forever()
2.4.11. Конфигурация¶
Метакласс Service
принимает все именованные аргументы в __init__
и устанавливает из как атрибуты в self
.
import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer
class LoggingService(Service):
# обязательные именованные аргументы
__required__ = frozenset({'name'})
# default value
delay: int = 1
async def start(self):
self.start_event.set()
while True:
# аттрибут ``name`` из именованных аргументов
# должен быть передан при создании экземпляра
print('Hello from service', self.name)
# аттрибут ``delay`` из именованных аргументов
await asyncio.sleep(self.delay)
services = (
LoggingService(name='#1'),
LoggingService(name='#2', delay=3),
)
with entrypoint(*services) as loop:
loop.run_forever()
2.4.12. aiohttp сервис¶
Предупреждение
требуется установленная библиотека aiohttp
pip install aiohttp
или как дополнительную зависимость
pip install aiomisc[aiohttp]
Приложение aiohttp может быть запущено как сервис:
import aiohttp.web
import argparse
from aiomisc import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--address", default="::",
help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return aiohttp.web.Response(text=text)
class REST(AIOHTTPService):
async def create_application(self):
app = aiohttp.web.Application()
app.add_routes([
aiohttp.web.get('/', handle),
aiohttp.web.get('/{name}', handle)
])
return app
arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
Класс AIOHTTPSSLService
похож на AIOHTTPService
, но создает HTTPS сервер. Вы должны передать требуемые для SSL параметры (см. Класс TLSServer).
2.4.13. asgi сервис¶
Предупреждение
требуется установленная библиотека aiohttp-asgi:
pip install aiohttp-asgi
или как дополнительную зависимость
pip install aiomisc[asgi]
Любое ASGI совместимое приложение может быть запущено как сервис:
import argparse
from fastapi import FastAPI
from aiomisc import entrypoint
from aiomisc.service.asgi import ASGIHTTPService, ASGIApplicationType
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--address", default="::",
help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
class REST(ASGIHTTPService):
async def create_asgi_app(self) -> ASGIApplicationType:
return app
arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
Класс ASGIHTTPSSLService
похож на ASGIHTTPService
, но создает HTTPS сервер. Вы должны передать требуемые для SSL параметры (см. Класс TLSServer
).
2.4.14. uvicorn service¶
Предупреждение
requires installed uvicorn:
pip install uvicorn
или как дополнительную зависимость
pip install aiomisc[uvicorn]
Any ASGI-like application can be started via uvicorn as a service:
import argparse
from fastapi import FastAPI
from aiomisc import entrypoint
from aiomisc.service.uvicorn import UvicornApplication, UvicornService
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--host", default="::",
help="Listen HTTP host")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
class REST(UvicornService):
async def create_application(self) -> UvicornApplication:
return app
arguments = parser.parse_args()
service = REST(host=arguments.host, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
2.4.15. GRPC service¶
Это пример GRPC-сервиса, который определяется в файле и загружает файл hello.proto без кодогенерации, этот пример является одним из примеров из grpcio, остальные примеры будут работать как ожидается.
Определение proto файла
syntax = "proto3";
package helloworld;
// Определение сервиса приветствия.
service Greeter {
// Сказать привет
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// Сообщение запроса, содержащее имя пользователя.
message HelloRequest {
string name = 1;
}
// Ответное сообщение, содержащее приветствие
message HelloReply {
string message = 1;
}
Пример инициализации сервиса:
import grpc
import aiomisc
from aiomisc.service.grpc_server import GRPCService
protos, services = grpc.protos_and_services("hello.proto")
class Greeter(services.GreeterServicer):
async def SayHello(self, request, context):
return protos.HelloReply(message='Hello, %s!' % request.name)
def main():
grpc_service = GRPCService(compression=grpc.Compression.Gzip)
services.add_GreeterServicer_to_server(
Greeter(), grpc_service,
)
grpc_service.add_insecure_port('[::]:0')
grpc_service.add_insecure_port('[::1]:0')
grpc_service.add_insecure_port('127.0.0.1:0')
grpc_service.add_insecure_port('localhost:0')
grpc_service.add_secure_port('localhost:0', grpc.local_server_credentials())
grpc_service.add_secure_port('[::]:0', grpc.local_server_credentials())
with aiomisc.entrypoint(grpc_service) as loop:
loop.run_forever()
if __name__ == '__main__':
main()
To enable reflection for the service you use reflection flag:
GRPCService(reflection=True)
2.4.16. Трассировщик памяти¶
Простой и полезный сервис для логирования больших объектов Python, размещенных в памяти.
import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import MemoryTracer
async def main():
leaking = []
while True:
leaking.append(os.urandom(128))
await asyncio.sleep(0)
with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
loop.run_until_complete(main())
Пример вывода:
[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
Objects | Obj.Diff | Memory | Mem.Diff | Traceback
12 | 12 | 1.9KiB | 1.9KiB | aiomisc/periodic.py:40
12 | 12 | 1.8KiB | 1.8KiB | aiomisc/entrypoint.py:93
6 | 6 | 1.1KiB | 1.1KiB | aiomisc/thread_pool.py:71
2 | 2 | 976.0B | 976.0B | aiomisc/thread_pool.py:44
5 | 5 | 712.0B | 712.0B | aiomisc/thread_pool.py:52
[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
Objects | Obj.Diff | Memory | Mem.Diff | Traceback
43999 | 43999 | 7.1MiB | 7.1MiB | scratches/scratch_8.py:11
47 | 47 | 4.7KiB | 4.7KiB | env/bin/../lib/python3.7/abc.py:143
33 | 33 | 2.8KiB | 2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
44 | 44 | 2.4KiB | 2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
14 | 14 | 2.4KiB | 2.4KiB | aiomisc/periodic.py:40
2.4.17. Profiler
- профилировщик¶
Простой сервис для профилирования. Необязательный аргумент path может быть предоставлен для выгрузки полных данных профилирования, которые позже могут быть использованы, например, snakeviz. Также можно изменить порядок с аргументом order
(по умолчанию «cumulative»).
import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import Profiler
async def main():
for i in range(100):
time.sleep(0.01)
with entrypoint(Profiler(interval=0.1, top_results=5)) as loop:
loop.run_until_complete(main())
Пример вывода:
108 function calls in 1.117 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
100 1.117 0.011 1.117 0.011 {built-in method time.sleep}
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/pstats.py:89(__init__)
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/pstats.py:99(init)
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/pstats.py:118(load_stats)
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/cProfile.py:50(create_stats)
2.4.18. Raven сервис¶
Простой сервис для отправки необработанных исключений в сервис sentry.
Простой пример:
import asyncio
import logging
import sys
from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender
async def main():
while True:
await asyncio.sleep(1)
try:
1 / 0
except ZeroDivisionError:
logging.exception("Exception")
raven_sender = RavenSender(
sentry_dsn=(
"https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
"5530251"
),
client_options=dict(
# По умолчанию возьмет значение переменной окружения SENTRY_NAME
name="example-from-aiomisc",
# По умолчанию возьмет значение переменной окружения SENTRY_ENVIRONMENT
environment="simple_example",
# По умолчанию возьмет значение переменной окружения SENTRY_RELEASE
release=__version__,
)
)
with entrypoint(raven_sender) as loop:
loop.run_until_complete(main())
Все опции для клиента:
import asyncio
import logging
import sys
from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender
async def main():
while True:
await asyncio.sleep(1)
try:
1 / 0
except ZeroDivisionError:
logging.exception("Exception")
raven_sender = RavenSender(
sentry_dsn=(
"https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
"5530251"
),
client_options=dict(
# По умолчанию возьмет значение переменной окружения SENTRY_NAME
name="",
# По умолчанию возьмет значение переменной окружения SENTRY_ENVIRONMENT
environment="full_example",
# По умолчанию возьмет значение переменной окружения SENTRY_RELEASE
release=__version__,
# Умолчания для остальных аргументов
include_paths=set(),
exclude_paths=set(),
auto_log_stacks=True,
capture_locals=True,
string_max_length=400,
list_max_length=50,
site=None,
include_versions=True,
processors=(
'raven.processors.SanitizePasswordsProcessor',
),
sanitize_keys=None,
context={'sys.argv': getattr(sys, 'argv', [])[:]},
tags={},
sample_rate=1,
ignore_exceptions=(),
)
)
with entrypoint(raven_sender) as loop:
loop.run_until_complete(main())
Вы можете найти полное описание параметров в документации Raven.
2.4.19. SDWatchdogService
¶
Готовый к использованию сервис, просто добавьте его в entrypoint и он будет отправлять уведомления сторожевому таймеру SystemD.
Вы можете безопасно добавлять это в любом случае, так как если сервис не найдет переменных окружения, которые устанавливает systemd, Сервис просто не запустится, однако выполнение приложения продолжится.
Пример python файла:
import logging
from time import sleep
from aiomisc import entrypoint
from aiomisc.service.sdwatchdog import SDWatchdogService
if __name__ == '__main__':
with entrypoint(SDWatchdogService()) as loop:
pass
Пример systemd сервис-файла:
[Service]
# Активизируем механизм уведомлений
Type=notify
# Команда которую следует запускать
ExecStart=/home/mosquito/.venv/aiomisc/bin/python /home/mosquito/scratch.py
# Время через которое программа должны посылать нотификации сторожевого таймера
WatchdogSec=5
# Процесс будет убит если он перестал отвечать сторожевому таймеру
WatchdogSignal=SIGKILL
# Сервис будет перезапущен в случае ошибки
Restart=on-failure
# Пробуем убить сам процесс вместо всей cgroup
KillMode=process
# Пробуем остановить сервис "помягче"
KillSignal=SIGINT
# Пробуем остановить сервис "помягче" при перезапуске
RestartKillSignal=SIGINT
# Слать SIGKILL если произошел таймаут остановки
FinalKillSignal=SIGKILL
SendSIGKILL=yes
2.4.20. Класс ProcessService
¶
Базовый класс для запуска функции отдельным системным процессом и завершения при остановке родительского процесса.
from typing import Dict, Any
import aiomisc.service
# Реализация вымышленного майнера
from .my_miner import Miner
class MiningService(aiomisc.service.ProcessService):
bitcoin: bool = False
monero: bool = False
dogiecoin: bool = False
def in_process(self) -> Any:
if self.bitcoin:
miner = Miner(kind="bitcoin")
elif self.monero:
miner = Miner(kind="monero")
elif self.dogiecoin:
miner = Miner(kind="dogiecoin")
else:
# Ничего делать не нужно
return
miner.do_mining()
services = [
MiningService(bitcoin=True),
MiningService(monero=True),
MiningService(dogiecoin=True),
]
if __name__ == '__main__':
with aiomisc.entrypoint(*services) as loop:
loop.run_forever()
2.4.21. Класс RespawningProcessService
¶
Базовый класс для запуска функции отдельным системным процессом и завершения при остановке родительского процесса. Это очень похоже на ProcessService с одним отличием - если дочерний процесс неожиданно завершится, то он будет перезапущен.
import logging
from typing import Any
import aiomisc
from time import sleep
class SuicideService(aiomisc.service.RespawningProcessService):
def in_process(self) -> Any:
sleep(10)
logging.warning("Goodbye mad world")
exit(42)
if __name__ == '__main__':
with aiomisc.entrypoint(SuicideService()) as loop:
loop.run_forever()