2.4. Сервисы#
Сервисы - это абстракция, помогающая организовать множество различных задач в одном процессе. Каждый сервис должен реализовывать обязательный метод start() и опционально метод stop ().
Экземпляр сервиса должен быть передан в «точку входа» (entrypoint) и будет запущен после создания event loop.
Примечание
Текущий event-loop будет установлен до вызова метода start(). Event loop будет установлен для этого потока.
Пожалуйста, избегайте использования asyncio.get_event_loop() явно внутри метода start(). Вместо этого используйте атрибут сервиса self.loop:
import asyncio
from threading import Event
from aiomisc import entrypoint, Service
event = Event()
class MyService(Service):
async def start(self):
# Отправляем сигнал в entrypoint что можно продолжать
self.start_event.set()
event.set()
# Запуск чего-то полезного
await asyncio.sleep(3600)
with entrypoint(MyService()) as loop:
assert event.is_set()
Метод start() создается как отдельная задача, которая может выполняться бесконечно. Но в этом случае необходимо утановить событие вызовом self.start_event.set() для уведомления entrypoint об окончании запуска.
Во время завершения работы сначала будет вызван метод stop(), а после этого все запущенные задачи будут отменены (включая start()).
Этот пакет содержит несколько полезных базовых классов для написания простых сервисов.
2.4.1. Класс TCPServer#
TCPServer - это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer), чтобы принимать TCP соединения.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer
log = logging.getLogger(__name__)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
async def echo_client(host, port):
reader, writer = await asyncio.open_connection(host=host, port=port)
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(address='localhost', port=8901),
) as loop:
loop.run_until_complete(echo_client("localhost", 8901))
2.4.2. Класс UDPServer#
UDPServer - это базовый класс для реализации UDP сервера. Просто реализуйте handle_datagram(data, addr), чтобы принимать UDP соединения.
class UDPPrinter(UDPServer):
async def handle_datagram(self, data: bytes, addr):
print(addr, '->', data)
with entrypoint(UDPPrinter(address='localhost', port=3000)) as loop:
loop.run_forever()
2.4.3. Класс TLSServer#
TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer).
class SecureEchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
service = SecureEchoServer(
address='localhost',
port=8900,
ca='ca.pem',
cert='cert.pem',
key='key.pem',
verify=False,
)
with entrypoint(service) as loop:
loop.run_forever()
2.4.4. TCPClient#
TCPServer - это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer), чтобы принимать TCP соединения.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient
log = logging.getLogger(__name__)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(TCPClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(address='localhost', port=8901),
EchoClient(address='localhost', port=8901),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.5. TLSClient#
TCPServer - это базовый класс для реализации TCP сервера. Просто реализуйте handle_client(reader, writer), чтобы принимать TCP соединения.
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient
log = logging.getLogger(__name__)
class EchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(TLSClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(
address='localhost', port=8901,
ca='ca.pem',
cert='server.pem',
key='server.key',
),
EchoClient(
address='localhost', port=8901,
ca='ca.pem',
cert='client.pem',
key='client.key',
),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.6. RobustTCPClient#
TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer).
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient
log = logging.getLogger(__name__)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(RobustTCPClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(address='localhost', port=8901),
EchoClient(address='localhost', port=8901),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.7. RobustTLSClient#
TLSServer - это базовый класс для написания TCP-серверов с использованием TLS. Просто реализуйте handle_client(reader, writer).
import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient
log = logging.getLogger(__name__)
class EchoServer(TLSServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while not reader.at_eof():
writer.write(await reader.read(255))
log.info("Client connection closed")
class EchoClient(RobustTLSClient):
async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
writer.write(b"hello\n")
assert await reader.readline() == b"hello\n"
writer.write(b"world\n")
assert await reader.readline() == b"world\n"
writer.write_eof()
writer.close()
await writer.wait_closed()
with entrypoint(
EchoServer(
address='localhost', port=8901,
ca='ca.pem',
cert='server.pem',
key='server.key',
),
EchoClient(
address='localhost', port=8901,
ca='ca.pem',
cert='client.pem',
key='client.key',
),
) as loop:
loop.run_until_complete(asyncio.sleep(0.1))
2.4.8. Класс PeriodicService#
PeriodicService запускает PeriodicCallback как сервис и ожидает завершения обратного вызова при остановке. Вам необходимо использовать PeriodicService в качестве базового класса и переопределить асинхронный метод callback.
Сервисный класс принимает обязательный аргумент interval - интервал запуска в секундах и необязательный аргумент delay - задержку первого выполнения в секундах (по умолчанию 0).
import aiomisc
from aiomisc.service.periodic import PeriodicService
class MyPeriodicService(PeriodicService):
async def callback(self):
log.info('Running periodic callback')
# ...
service = MyPeriodicService(interval=3600, delay=0) # раз в час
with entrypoint(service) as loop:
loop.run_forever()
2.4.9. DNS Server#
Описанный здесь DNS-сервер использует библиотеку «aiomisc», которая предоставляет утилиты для асинхронных операций ввода-вывода, и библиотеку «dnslib» для обработки DNS-записей и пакетов. Эта настройка идеально подходит для высокопроизводительной неблокирующей обработки DNS-запросов.
2.4.9.1. Ключевые Характеристики#
Асинхронный ввод-вывод: использует асинхронные операции для одновременной обработки нескольких DNS-запросов, обеспечивая высокую производительность и масштабируемость.
Поддержка UDP и TCP: сервер поддерживает DNS-запросы по протоколам UDP и TCP, что делает его универсальным для различных сетевых конфигураций.
Поддержка ``EDNS0``: реализует механизмы расширения DNS (
EDNS0) для обработки более крупных сообщений DNS и расширенных функций.Настраиваемые записи DNS: позволяет легко настраивать зоны и записи DNS, позволяя эффективно разрешать записи DNS и управлять ими.
2.4.9.2. Пререквизиты#
Установите необходимые библиотеки с помощью pip:
pip install aiomisc[dns]
2.4.9.3. Настройка сервера#
from aiomisc import entrypoint
from aiomisc.service.dns import (
DNSStore, DNSZone, TCPDNSServer, UDPDNSServer, records,
)
zone = DNSZone("test.")
zone.add_record(records.A.create("test.", "10.10.10.10"))
zone.add_record(records.AAAA.create("test.", "fd10::10"))
store = DNSStore()
store.add_zone(zone)
services = [
UDPDNSServer(
store=store, address="::1", port=5053,
),
TCPDNSServer(
store=store, address="::1", port=5053,
),
]
if __name__ == "__main__":
with entrypoint(*services, log_level="debug") as loop:
loop.run_forever()
2.4.9.4. Тестирование сервера#
Вы можете протестировать DNS-сервер, используя такие инструменты, как «dig». Например, чтобы запросить записи A и AAAA для test., используйте следующие команды:
dig @::1 -p 5053 test. A
dig @::1 -p 5053 test. AAAA
dig @::1 -p 5053 +tcp test. A
dig @::1 -p 5053 +tcp test. AAAA
Эти команды должны вернуть IP-адреса 10.10.10.10 и fd10::10 соответственно, значит DNS-сервер работает правильно.
2.4.9.5. Динамическое управление хранилищем записей#
Одной из мощных функций этой реализации DNS-сервера является возможность динамического управления хранилищем записей DNS. Это позволяет добавлять или удалять зоны и записи во время выполнения без необходимости перезапуска сервера.
Динамическое управление DNS-зонами и записями необходимо для гибкой настройки DNS-сервера. В этом руководстве основное внимание уделяется управлению зонами и записями DNS с помощью классов DNSStore и DNSZone, а также приведены практические примеры для каждой операции.
2.4.9.6. Добавление зоны#
Вы можете добавить новую зону в DNSStore для управления ее записями. Эта операция гарантирует, что зона доступна для DNS-запросов.
from aiomisc.service.dns import DNSStore, DNSZone
# Create a DNSStore instance
dns_store = DNSStore()
# Create a DNSZone instance
zone = DNSZone("example.com.")
# Add the zone to the store
dns_store.add_zone(zone)
# Verify the zone is added
assert dns_store.get_zone("example.com.") is zone
2.4.9.7. Удаление зоны#
Удаление зоны из DNSStore гарантирует, что она больше не будет доступна для запросов DNS.
# Add the zone to the store
dns_store.add_zone(zone)
# Remove the zone from the store
dns_store.remove_zone("example.com.")
# Verify the zone is removed
assert dns_store.get_zone("example.com.") is None
2.4.9.8. Добавление записей в зону#
Для управления записями DNS вы можете добавлять записи в определенную зону. Эта операция делает запись доступной для разрешения DNS в этой зоне.
from aiomisc.service.dns.records import A, RecordType
# Create a DNSZone instance
zone = DNSZone("example.com.")
# Create an A record
record = A.create(name="www.example.com.", ip="192.0.2.1")
# Add the record to the zone
zone.add_record(record)
# Add the zone to the store
dns_store.add_zone(zone)
# Query the store for the record
records = dns_store.query("www.example.com.", RecordType.A)
# Verify the record is added
assert record in records
2.4.9.9. Запрос DNS записей#
Вы можете запросить DNSStore для получения записей для определенного доменного имени. Это полезно для проверки существования записи или для обработки DNS-запросов.
# Query the store for a nonexistent record
records = dns_store.query("nonexistent.example.com.", RecordType.A)
# Verify no records are found
assert len(records) == 0
2.4.9.10. Обработка дубликатов зон#
Попытка добавить уже существующую зону должна вызвать ошибку, гарантируя уникальность каждой зоны в DNSStore.
# Add the zone to the store
dns_store.add_zone(zone)
# Attempt to add the same zone again
try:
dns_store.add_zone(zone)
except ValueError as e:
print(f"Error: {e}")
2.4.9.11. Удаление несуществующей зоны#
Удаление несуществующей зоны должно вызвать ошибку, указывающую на недопустимость операции.
# Attempt to remove a nonexistent zone
try:
dns_store.remove_zone("nonexistent.com.")
except ValueError as e:
print(f"Error: {e}")
2.4.9.12. Запросы поддоменов#
DNSStore поддерживает запросы к субдоменам, что позволяет вам разрешать записи внутри субдоменов существующей зоны.
# Create a DNSZone instance
zone = DNSZone("example.com.")
# Create an A record for a subdomain
record = A.create(name="sub.example.com.", ip="192.0.2.2")
# Add the record to the zone
zone.add_record(record)
# Add the zone to the store
dns_store.add_zone(zone)
# Query the store for the subdomain record
records = dns_store.query("sub.example.com.", RecordType.A)
# Verify the subdomain record is found
assert record in records
2.4.9.13. Получение зоны#
Вы можете получить зону из DNSStore, чтобы проверить ее или управлять ею.
# Add the zone to the store
dns_store.add_zone(zone)
# Retrieve the zone from the store
retrieved_zone = dns_store.get_zone("example.com.")
# Verify the zone is retrieved
assert retrieved_zone is zone
2.4.9.14. Обработка несуществующих зон#
При получении несуществующей зоны должно возвращаться значение None.
# Attempt to retrieve a nonexistent zone
nonexistent_zone = dns_store.get_zone("nonexistent.com.")
# Verify no zone is retrieved
assert nonexistent_zone is None
2.4.9.15. Удаление записи из зоны#
Чтобы удалить запись DNS из зоны, вы можете использовать метод remove_record. Эта операция гарантирует, что запись больше не будет доступна для разрешения DNS.
# Create a DNSZone instance
zone = DNSZone("example.com.")
# Create an A record
record = A.create(name="www.example.com.", ip="192.0.2.1")
# Add the record to the zone
zone.add_record(record)
# Remove the record from the zone
zone.remove_record(record)
# Add the zone to the store
dns_store.add_zone(zone)
# Query the store for the record
records = dns_store.query("www.example.com.", RecordType.A)
# Verify the record is removed
assert len(records) == 0
2.4.9.16. Поиск зоны по префиксу#
DNSStore может найти подходящую зону для данного доменного имени, что полезно для обработки запросов с поддоменами.
# Create a DNSZone instance
zone = DNSZone("example.com.")
# Add the zone to the store
dns_store.add_zone(zone)
# Find the zone for a subdomain
zone_prefix = dns_store.get_zone_for_name("sub.example.com.")
# Verify the correct zone is found
assert zone_prefix == ("com", "example")
2.4.10. Класс CronService#
CronService запускает CronCallback в качестве сервиса и ожидает завершения выполнения обратных вызовов при остановке.
Основан на croniter. Вы можете зарегистрировать асинхронный метод с аргументом spec - формат, подобный cron:
Предупреждение
необходимо установить библиотеку croniter:
pip install croniter
или как дополнительную зависимость
pip install aiomisc[cron]
import aiomisc
from aiomisc.service.cron import CronService
async def callback():
log.info('Running cron callback')
# ...
service = CronService()
service.register(callback, spec="0 * * * *") # каждый час в 0 минут minutes
with entrypoint(service) as loop:
loop.run_forever()
Вы также можете наследовать от CronService, но помните, что регистрация обратного вызова должна выполняться до запуска
import aiomisc
from aiomisc.service.cron import CronService
class MyCronService(CronService):
async def callback(self):
log.info('Running cron callback')
# ...
async def start(self):
self.register(self.callback, spec="0 * * * *")
await super().start()
service = MyCronService()
with entrypoint(service) as loop:
loop.run_forever()
2.4.11. Несколько сервисов#
Передайте несколько экземпляров сервиса в entrypoint, чтобы запустить их все сразу. После выхода экземпляры сервиса точки входа будут корректно закрыты вызовом метода stop() или через отмену метода start().
import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer
class LoggingService(PeriodicService):
async def callabck(self):
print('Hello from service', self.name)
class EchoServer(TCPServer):
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while True:
writer.write(await reader.readline())
class UDPPrinter(UDPServer):
async def handle_datagram(self, data: bytes, addr):
print(addr, '->', data)
services = (
LoggingService(name='#1', interval=1),
EchoServer(address='localhost', port=8901),
UDPPrinter(address='localhost', port=3000),
)
with entrypoint(*services) as loop:
loop.run_forever()
2.4.12. Конфигурация#
Метакласс Service принимает все именованные аргументы в __init__ и устанавливает из как атрибуты в self.
import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer
class LoggingService(Service):
# обязательные именованные аргументы
__required__ = frozenset({'name'})
# default value
delay: int = 1
async def start(self):
self.start_event.set()
while True:
# аттрибут ``name`` из именованных аргументов
# должен быть передан при создании экземпляра
print('Hello from service', self.name)
# аттрибут ``delay`` из именованных аргументов
await asyncio.sleep(self.delay)
services = (
LoggingService(name='#1'),
LoggingService(name='#2', delay=3),
)
with entrypoint(*services) as loop:
loop.run_forever()
2.4.13. aiohttp сервис#
Предупреждение
требуется установленная библиотека aiohttp
pip install aiohttp
или как дополнительную зависимость
pip install aiomisc[aiohttp]
Приложение aiohttp может быть запущено как сервис:
import aiohttp.web
import argparse
from aiomisc import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--address", default="::",
help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return aiohttp.web.Response(text=text)
class REST(AIOHTTPService):
async def create_application(self):
app = aiohttp.web.Application()
app.add_routes([
aiohttp.web.get('/', handle),
aiohttp.web.get('/{name}', handle)
])
return app
arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
Класс AIOHTTPSSLService похож на AIOHTTPService, но создает HTTPS сервер. Вы должны передать требуемые для SSL параметры (см. Класс TLSServer).
2.4.14. asgi сервис#
Предупреждение
требуется установленная библиотека aiohttp-asgi:
pip install aiohttp-asgi
или как дополнительную зависимость
pip install aiomisc[asgi]
Любое ASGI совместимое приложение может быть запущено как сервис:
import argparse
from fastapi import FastAPI
from aiomisc import entrypoint
from aiomisc.service.asgi import ASGIHTTPService, ASGIApplicationType
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--address", default="::",
help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
class REST(ASGIHTTPService):
async def create_asgi_app(self) -> ASGIApplicationType:
return app
arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
Класс ASGIHTTPSSLService похож на ASGIHTTPService, но создает HTTPS сервер. Вы должны передать требуемые для SSL параметры (см. Класс TLSServer).
2.4.15. uvicorn service#
Предупреждение
requires installed uvicorn:
pip install uvicorn
или как дополнительную зависимость
pip install aiomisc[uvicorn]
Any ASGI-like application can be started via uvicorn as a service:
import argparse
from fastapi import FastAPI
from aiomisc import entrypoint
from aiomisc.service.uvicorn import UvicornApplication, UvicornService
parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')
group.add_argument("-l", "--host", default="::",
help="Listen HTTP host")
group.add_argument("-p", "--port", type=int, default=8080,
help="Listen HTTP port")
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
class REST(UvicornService):
async def create_application(self) -> UvicornApplication:
return app
arguments = parser.parse_args()
service = REST(host=arguments.host, port=arguments.port)
with entrypoint(service) as loop:
loop.run_forever()
2.4.16. GRPC service#
Это пример GRPC-сервиса, который определяется в файле и загружает файл hello.proto без кодогенерации, этот пример является одним из примеров из grpcio, остальные примеры будут работать как ожидается.
Определение proto файла
syntax = "proto3";
package helloworld;
// Определение сервиса приветствия.
service Greeter {
// Сказать привет
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// Сообщение запроса, содержащее имя пользователя.
message HelloRequest {
string name = 1;
}
// Ответное сообщение, содержащее приветствие
message HelloReply {
string message = 1;
}
Пример инициализации сервиса:
import grpc
import aiomisc
from aiomisc.service.grpc_server import GRPCService
protos, services = grpc.protos_and_services("hello.proto")
class Greeter(services.GreeterServicer):
async def SayHello(self, request, context):
return protos.HelloReply(message='Hello, %s!' % request.name)
def main():
grpc_service = GRPCService(compression=grpc.Compression.Gzip)
services.add_GreeterServicer_to_server(
Greeter(), grpc_service,
)
grpc_service.add_insecure_port('[::]:0')
grpc_service.add_insecure_port('[::1]:0')
grpc_service.add_insecure_port('127.0.0.1:0')
grpc_service.add_insecure_port('localhost:0')
grpc_service.add_secure_port('localhost:0', grpc.local_server_credentials())
grpc_service.add_secure_port('[::]:0', grpc.local_server_credentials())
with aiomisc.entrypoint(grpc_service) as loop:
loop.run_forever()
if __name__ == '__main__':
main()
To enable reflection for the service you use reflection flag:
GRPCService(reflection=True)
2.4.17. Трассировщик памяти#
Простой и полезный сервис для логирования больших объектов Python, размещенных в памяти.
import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import MemoryTracer
async def main():
leaking = []
while True:
leaking.append(os.urandom(128))
await asyncio.sleep(0)
with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
loop.run_until_complete(main())
Пример вывода:
[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
Objects | Obj.Diff | Memory | Mem.Diff | Traceback
12 | 12 | 1.9KiB | 1.9KiB | aiomisc/periodic.py:40
12 | 12 | 1.8KiB | 1.8KiB | aiomisc/entrypoint.py:93
6 | 6 | 1.1KiB | 1.1KiB | aiomisc/thread_pool.py:71
2 | 2 | 976.0B | 976.0B | aiomisc/thread_pool.py:44
5 | 5 | 712.0B | 712.0B | aiomisc/thread_pool.py:52
[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
Objects | Obj.Diff | Memory | Mem.Diff | Traceback
43999 | 43999 | 7.1MiB | 7.1MiB | scratches/scratch_8.py:11
47 | 47 | 4.7KiB | 4.7KiB | env/bin/../lib/python3.7/abc.py:143
33 | 33 | 2.8KiB | 2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
44 | 44 | 2.4KiB | 2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
14 | 14 | 2.4KiB | 2.4KiB | aiomisc/periodic.py:40
2.4.18. Profiler - профилировщик#
Простой сервис для профилирования. Необязательный аргумент path может быть предоставлен для выгрузки полных данных профилирования, которые позже могут быть использованы, например, snakeviz. Также можно изменить порядок с аргументом order (по умолчанию «cumulative»).
import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import Profiler
async def main():
for i in range(100):
time.sleep(0.01)
with entrypoint(Profiler(interval=0.1, top_results=5)) as loop:
loop.run_until_complete(main())
Пример вывода:
108 function calls in 1.117 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
100 1.117 0.011 1.117 0.011 {built-in method time.sleep}
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/pstats.py:89(__init__)
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/pstats.py:99(init)
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/pstats.py:118(load_stats)
1 0.000 0.000 0.000 0.000 <...>/lib/python3.7/cProfile.py:50(create_stats)
2.4.19. Raven сервис#
Простой сервис для отправки необработанных исключений в сервис sentry.
Простой пример:
import asyncio
import logging
import sys
from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender
async def main():
while True:
await asyncio.sleep(1)
try:
1 / 0
except ZeroDivisionError:
logging.exception("Exception")
raven_sender = RavenSender(
sentry_dsn=(
"https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
"5530251"
),
client_options=dict(
# По умолчанию возьмет значение переменной окружения SENTRY_NAME
name="example-from-aiomisc",
# По умолчанию возьмет значение переменной окружения SENTRY_ENVIRONMENT
environment="simple_example",
# По умолчанию возьмет значение переменной окружения SENTRY_RELEASE
release=__version__,
)
)
with entrypoint(raven_sender) as loop:
loop.run_until_complete(main())
Все опции для клиента:
import asyncio
import logging
import sys
from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender
async def main():
while True:
await asyncio.sleep(1)
try:
1 / 0
except ZeroDivisionError:
logging.exception("Exception")
raven_sender = RavenSender(
sentry_dsn=(
"https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
"5530251"
),
client_options=dict(
# По умолчанию возьмет значение переменной окружения SENTRY_NAME
name="",
# По умолчанию возьмет значение переменной окружения SENTRY_ENVIRONMENT
environment="full_example",
# По умолчанию возьмет значение переменной окружения SENTRY_RELEASE
release=__version__,
# Умолчания для остальных аргументов
include_paths=set(),
exclude_paths=set(),
auto_log_stacks=True,
capture_locals=True,
string_max_length=400,
list_max_length=50,
site=None,
include_versions=True,
processors=(
'raven.processors.SanitizePasswordsProcessor',
),
sanitize_keys=None,
context={'sys.argv': getattr(sys, 'argv', [])[:]},
tags={},
sample_rate=1,
ignore_exceptions=(),
)
)
with entrypoint(raven_sender) as loop:
loop.run_until_complete(main())
Вы можете найти полное описание параметров в документации Raven.
2.4.20. SDWatchdogService#
Готовый к использованию сервис, просто добавьте его в entrypoint и он будет отправлять уведомления сторожевому таймеру SystemD.
Вы можете безопасно добавлять это в любом случае, так как если сервис не найдет переменных окружения, которые устанавливает systemd, Сервис просто не запустится, однако выполнение приложения продолжится.
Пример python файла:
import logging
from time import sleep
from aiomisc import entrypoint
from aiomisc.service.sdwatchdog import SDWatchdogService
if __name__ == '__main__':
with entrypoint(SDWatchdogService()) as loop:
pass
Пример systemd сервис-файла:
[Service]
# Активизируем механизм уведомлений
Type=notify
# Команда которую следует запускать
ExecStart=/home/mosquito/.venv/aiomisc/bin/python /home/mosquito/scratch.py
# Время через которое программа должны посылать нотификации сторожевого таймера
WatchdogSec=5
# Процесс будет убит если он перестал отвечать сторожевому таймеру
WatchdogSignal=SIGKILL
# Сервис будет перезапущен в случае ошибки
Restart=on-failure
# Пробуем убить сам процесс вместо всей cgroup
KillMode=process
# Пробуем остановить сервис "помягче"
KillSignal=SIGINT
# Пробуем остановить сервис "помягче" при перезапуске
RestartKillSignal=SIGINT
# Слать SIGKILL если произошел таймаут остановки
FinalKillSignal=SIGKILL
SendSIGKILL=yes
2.4.21. Класс ProcessService#
Базовый класс для запуска функции отдельным системным процессом и завершения при остановке родительского процесса.
from typing import Dict, Any
import aiomisc.service
# Реализация вымышленного майнера
from .my_miner import Miner
class MiningService(aiomisc.service.ProcessService):
bitcoin: bool = False
monero: bool = False
dogiecoin: bool = False
def in_process(self) -> Any:
if self.bitcoin:
miner = Miner(kind="bitcoin")
elif self.monero:
miner = Miner(kind="monero")
elif self.dogiecoin:
miner = Miner(kind="dogiecoin")
else:
# Ничего делать не нужно
return
miner.do_mining()
services = [
MiningService(bitcoin=True),
MiningService(monero=True),
MiningService(dogiecoin=True),
]
if __name__ == '__main__':
with aiomisc.entrypoint(*services) as loop:
loop.run_forever()
2.4.22. Класс RespawningProcessService#
Базовый класс для запуска функции отдельным системным процессом и завершения при остановке родительского процесса. Это очень похоже на ProcessService с одним отличием - если дочерний процесс неожиданно завершится, то он будет перезапущен.
import logging
from typing import Any
import aiomisc
from time import sleep
class SuicideService(aiomisc.service.RespawningProcessService):
def in_process(self) -> Any:
sleep(10)
logging.warning("Goodbye mad world")
exit(42)
if __name__ == '__main__':
with aiomisc.entrypoint(SuicideService()) as loop:
loop.run_forever()