2.4. Services#

Services is an abstraction to help organize lots of different tasks in one process. Each service must implement start() method and can implement stop() method.

Service instance should be passed to the entrypoint, and will be started after the event loop has been created.

Note

Current event-loop will be set before start() method called. The event loop will be set as current for this thread.

Please avoid using asyncio.get_event_loop() explicitly inside start() method. Use self.loop instead:

import asyncio
from threading import Event
from aiomisc import entrypoint, Service

event = Event()

class MyService(Service):
  async def start(self):
      # Send signal to entrypoint for continue running
      self.start_event.set()

      event.set()
      # Start service task
      await asyncio.sleep(3600)


with entrypoint(MyService()) as loop:
    assert event.is_set()

Method start() creates as a separate task that can run forever. But in this case self.start_event.set() should be called for notifying entrypoint.

During graceful shutdown method stop() will be called first, and after that, all running tasks will be canceled (including start()).

This package contains some useful base classes for simple services writing.

2.4.1. TCPServer#

TCPServer - it’s a base class for writing TCP servers. Just implement handle_client(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer


log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


async def echo_client(host, port):
    reader, writer = await asyncio.open_connection(host=host, port=port)
    writer.write(b"hello\n")
    assert await reader.readline() == b"hello\n"

    writer.write(b"world\n")
    assert await reader.readline() == b"world\n"

    writer.close()
    await writer.wait_closed()


with entrypoint(
    EchoServer(address='localhost', port=8901),
) as loop:
    loop.run_until_complete(echo_client("localhost", 8901))

2.4.2. UDPServer#

UDPServer - it’s a base class for writing UDP servers. Just implement handle_datagram(data, addr) to use it.

class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


with entrypoint(UDPPrinter(address='localhost', port=3000)) as loop:
    loop.run_forever()

2.4.3. TLSServer#

TLSServer - it’s a base class for writing TCP servers with TLS. Just implement handle_client(reader, writer) to use it.

class SecureEchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())

service = SecureEchoServer(
    address='localhost',
    port=8900,
    ca='ca.pem',
    cert='cert.pem',
    key='key.pem',
    verify=False,
)

with entrypoint(service) as loop:
    loop.run_forever()

2.4.4. TCPClient#

TCPClient - it’s a base class for writing TCP clients. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient

log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(TCPClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(address='localhost', port=8901),
    EchoClient(address='localhost', port=8901),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.5. TLSClient#

TLSClient - it’s a base class for writing TLS clients. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient

log = logging.getLogger(__name__)


class EchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(TLSClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='server.pem',
        key='server.key',
    ),
    EchoClient(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='client.pem',
        key='client.key',
    ),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.6. RobustTCPClient#

RobustTCPClient - it’s a base class for writing TCP clients with auto-reconnection when connection lost. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient

log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(RobustTCPClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(address='localhost', port=8901),
    EchoClient(address='localhost', port=8901),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.7. RobustTLSClient#

RobustTLSClient - it’s a base class for writing TLS clients with auto-reconnection when connection lost. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient

log = logging.getLogger(__name__)


class EchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(RobustTLSClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='server.pem',
        key='server.key',
    ),
    EchoClient(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='client.pem',
        key='client.key',
    ),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.8. PeriodicService#

PeriodicService runs PeriodicCallback as a service and waits for the running callback to complete on the stop method. You need to use PeriodicService as a base class and override callback async coroutine method.

Service class accepts required interval argument - periodic interval in seconds and optional delay argument - periodic execution delay in seconds (0 by default).

import aiomisc
from aiomisc.service.periodic import PeriodicService


class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')
        # ...

service = MyPeriodicService(interval=3600, delay=0)  # once per hour

with entrypoint(service) as loop:
    loop.run_forever()

2.4.9. CronService#

CronService runs CronCallback's as a service and waits for running callbacks to complete on the stop method.

It’s based on croniter. You can register async coroutine method with spec argument - cron like format:

Warning

requires installed croniter:

pip install croniter

or using extras:

pip install aiomisc[cron]
import aiomisc
from aiomisc.service.cron import CronService


async def callback():
    log.info('Running cron callback')
    # ...

service = CronService()
service.register(callback, spec="0 * * * *") # every hour at zero minutes

with entrypoint(service) as loop:
    loop.run_forever()

You can also inherit from CronService, but remember that callback registration should be proceeded before start

import aiomisc
from aiomisc.service.cron import CronService


class MyCronService(CronService):
    async def callback(self):
        log.info('Running cron callback')
        # ...

    async def start(self):
        self.register(self.callback, spec="0 * * * *")
        await super().start()

service = MyCronService()

with entrypoint(service) as loop:
    loop.run_forever()

2.4.10. Multiple services#

Pass several service instances to the entrypoint to run all of them. After exiting the entrypoint service instances will be gracefully shut down.

import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(PeriodicService):
    async def callabck(self):
        print('Hello from service', self.name)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())


class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


services = (
    LoggingService(name='#1', interval=1),
    EchoServer(address='localhost', port=8901),
    UDPPrinter(address='localhost', port=3000),
)


with entrypoint(*services) as loop:
    loop.run_forever()

2.4.11. Configuration#

Service metaclass accepts all kwargs and will set it to self as attributes.

import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(Service):
    # required kwargs
    __required__ = frozenset({'name'})

    # default value
    delay: int = 1

    async def start(self):
        self.start_event.set()
        while True:
            # attribute ``name`` from kwargs
            # must be defined when instance initializes
            print('Hello from service', self.name)

            # attribute ``delay`` from kwargs
            await asyncio.sleep(self.delay)

services = (
    LoggingService(name='#1'),
    LoggingService(name='#2', delay=3),
)


with entrypoint(*services) as loop:
    loop.run_forever()

2.4.12. aiohttp service#

Warning

requires installed aiohttp:

pip install aiohttp

or using extras:

pip install aiomisc[aiohttp]

aiohttp application can be started as a service:

import aiohttp.web
import argparse
from aiomisc import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--address", default="::",
                   help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return aiohttp.web.Response(text=text)


class REST(AIOHTTPService):
    async def create_application(self):
        app = aiohttp.web.Application()

        app.add_routes([
            aiohttp.web.get('/', handle),
            aiohttp.web.get('/{name}', handle)
        ])

        return app

arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

Class AIOHTTPSSLService is similar to AIOHTTPService but creates an HTTPS server. You must pass SSL-required options (see TLSServer class).

2.4.13. asgi service#

Warning

requires installed aiohttp-asgi:

pip install aiohttp-asgi

or using extras:

pip install aiomisc[asgi]

Any ASGI-like application can be started as a service:

import argparse

from fastapi import FastAPI

from aiomisc import entrypoint
from aiomisc.service.asgi import ASGIHTTPService, ASGIApplicationType

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--address", default="::",
                   help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}


class REST(ASGIHTTPService):
    async def create_asgi_app(self) -> ASGIApplicationType:
        return app


arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

Class ASGIHTTPSSLService is similar to ASGIHTTPService but creates HTTPS server. You must pass SSL-required options (see TLSServer class).

2.4.14. uvicorn service#

Warning

requires installed uvicorn:

pip install uvicorn

or using extras:

pip install aiomisc[uvicorn]

Any ASGI-like application can be started via uvicorn as a service:

import argparse

from fastapi import FastAPI

from aiomisc import entrypoint
from aiomisc.service.uvicorn import UvicornApplication, UvicornService

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--host", default="::",
                   help="Listen HTTP host")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}


class REST(UvicornService):
    async def create_application(self) -> UvicornApplication:
        return app


arguments = parser.parse_args()
service = REST(host=arguments.host, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

2.4.15. GRPC service#

This is an example of a GRPC service which is defined in a file and loads a hello.proto file without code generation, this example is one of the examples from grpcio, the other examples will work as expected.

Proto definition:

syntax = "proto3";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

Service initialization example:

import grpc

import aiomisc
from aiomisc.service.grpc_server import GRPCService


protos, services = grpc.protos_and_services("hello.proto")


class Greeter(services.GreeterServicer):
    async def SayHello(self, request, context):
        return protos.HelloReply(message='Hello, %s!' % request.name)


def main():
    grpc_service = GRPCService(compression=grpc.Compression.Gzip)
    services.add_GreeterServicer_to_server(
        Greeter(), grpc_service,
    )
    grpc_service.add_insecure_port('[::]:0')
    grpc_service.add_insecure_port('[::1]:0')
    grpc_service.add_insecure_port('127.0.0.1:0')
    grpc_service.add_insecure_port('localhost:0')
    grpc_service.add_secure_port('localhost:0', grpc.local_server_credentials())
    grpc_service.add_secure_port('[::]:0', grpc.local_server_credentials())

    with aiomisc.entrypoint(grpc_service) as loop:
        loop.run_forever()


if __name__ == '__main__':
    main()

To enable reflection for the service you use reflection flag:

GRPCService(reflection=True)

2.4.16. Memory Tracer#

Simple and useful service for logging large python objects allocated in memory.

import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import MemoryTracer


async def main():
    leaking = []

    while True:
        leaking.append(os.urandom(128))
        await asyncio.sleep(0)


with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
    loop.run_until_complete(main())

Output example:

[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
      12 |       12 |   1.9KiB |   1.9KiB | aiomisc/periodic.py:40
      12 |       12 |   1.8KiB |   1.8KiB | aiomisc/entrypoint.py:93
       6 |        6 |   1.1KiB |   1.1KiB | aiomisc/thread_pool.py:71
       2 |        2 |   976.0B |   976.0B | aiomisc/thread_pool.py:44
       5 |        5 |   712.0B |   712.0B | aiomisc/thread_pool.py:52

[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
   43999 |    43999 |   7.1MiB |   7.1MiB | scratches/scratch_8.py:11
      47 |       47 |   4.7KiB |   4.7KiB | env/bin/../lib/python3.7/abc.py:143
      33 |       33 |   2.8KiB |   2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
      44 |       44 |   2.4KiB |   2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
      14 |       14 |   2.4KiB |   2.4KiB | aiomisc/periodic.py:40

2.4.17. Profiler#

Simple service for profiling. Optional path argument can be provided to dump complete profiling data, which can be later used by, for example, snakeviz. Also can change ordering with the order argument (“cumulative” by default).

import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import Profiler


async def main():
    for i in range(100):
        time.sleep(0.01)


with entrypoint(Profiler(interval=0.1, top_results=5)) as loop:
    loop.run_until_complete(main())

Output example:

108 function calls in 1.117 seconds

Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   100    1.117    0.011    1.117    0.011 {built-in method time.sleep}
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:89(__init__)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:99(init)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:118(load_stats)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/cProfile.py:50(create_stats)

2.4.18. Raven service#

Simple service for sending unhandled exceptions to the sentry service instance.

Simple example:

import asyncio
import logging
import sys

from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender


async def main():
    while True:
        await asyncio.sleep(1)

        try:
            1 / 0
        except ZeroDivisionError:
            logging.exception("Exception")


raven_sender = RavenSender(
    sentry_dsn=(
        "https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
        "5530251"
    ),
    client_options=dict(
        # Got environment variable SENTRY_NAME by default
        name="example-from-aiomisc",
        # Got environment variable SENTRY_ENVIRONMENT by default
        environment="simple_example",
        # Got environment variable SENTRY_RELEASE by default
        release=__version__,
    )
)


with entrypoint(raven_sender) as loop:
    loop.run_until_complete(main())

Full configuration:

import asyncio
import logging
import sys

from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender


async def main():
    while True:
        await asyncio.sleep(1)

        try:
            1 / 0
        except ZeroDivisionError:
            logging.exception("Exception")


raven_sender = RavenSender(
    sentry_dsn=(
        "https://583ca3b555054f80873e751e8139e22a@o429974.ingest.sentry.io/"
        "5530251"
    ),
    client_options=dict(
        # Got environment variable SENTRY_NAME by default
        name="",
        # Got environment variable SENTRY_ENVIRONMENT by default
        environment="full_example",
        # Got environment variable SENTRY_RELEASE by default
        release=__version__,

        # Default options values
        include_paths=set(),
        exclude_paths=set(),
        auto_log_stacks=True,
        capture_locals=True,
        string_max_length=400,
        list_max_length=50,
        site=None,
        include_versions=True,
        processors=(
            'raven.processors.SanitizePasswordsProcessor',
        ),
        sanitize_keys=None,
        context={'sys.argv': getattr(sys, 'argv', [])[:]},
        tags={},
        sample_rate=1,
        ignore_exceptions=(),
    )
)


with entrypoint(raven_sender) as loop:
    loop.run_until_complete(main())

You will find the full specification of options in the Raven documentation.

2.4.19. SDWatchdogService#

Ready to use service just adding to your entrypoint and notifying SystemD service watchdog timer.

This can be safely added at any time, since if the service does not detect systemd-related environment variables, then its initialization is skipped.

Example of python file:

import logging
from time import sleep

from aiomisc import entrypoint
from aiomisc.service.sdwatchdog import SDWatchdogService


if __name__ == '__main__':
    with entrypoint(SDWatchdogService()) as loop:
        pass

Example of systemd service file:

[Service]
# Activating the notification mechanism
Type=notify

# Command which should be started
ExecStart=/home/mosquito/.venv/aiomisc/bin/python /home/mosquito/scratch.py

# The time for which the program must send a watchdog notification
WatchdogSec=5

# Kill the process if it has stopped responding to the watchdog timer
WatchdogSignal=SIGKILL

# The service should be restarted on failure
Restart=on-failure

# Try to kill the process instead of cgroup
KillMode=process

# Trying to stop service properly
KillSignal=SIGINT

# Trying to restart service properly
RestartKillSignal=SIGINT

# Send SIGKILL when timeouts are exceeded
FinalKillSignal=SIGKILL
SendSIGKILL=yes

2.4.20. ProcessService#

A base class for launching a function by a separate system process, and by termination when the parent process is stopped.

from typing import Dict, Any

import aiomisc.service

# Fictional miner implementation
from .my_miner import Miner


class MiningService(aiomisc.service.ProcessService):
    bitcoin: bool = False
    monero: bool = False
    dogiecoin: bool = False

    def in_process(self) -> Any:
        if self.bitcoin:
            miner = Miner(kind="bitcoin")
        elif self.monero:
            miner = Miner(kind="monero")
        elif self.dogiecoin:
            miner = Miner(kind="dogiecoin")
        else:
            # Nothing to do
            return

        miner.do_mining()


services = [
    MiningService(bitcoin=True),
    MiningService(monero=True),
    MiningService(dogiecoin=True),
]

if __name__ == '__main__':
    with aiomisc.entrypoint(*services) as loop:
        loop.run_forever()

2.4.21. RespawningProcessService#

A base class for launching a function by a separate system process, and by termination when the parent process is stopped, It’s pretty like ProcessService but have one difference when the process unexpectedly exited this will be respawned.

import logging
from typing import Any

import aiomisc

from time import sleep


class SuicideService(aiomisc.service.RespawningProcessService):
    def in_process(self) -> Any:
        sleep(10)
        logging.warning("Goodbye mad world")
        exit(42)


if __name__ == '__main__':
    with aiomisc.entrypoint(SuicideService()) as loop:
        loop.run_forever()