2.10. Предохранитель

Предохранитель - это шаблон проектирования, используемый при разработке программного обеспечения. Он используется для обнаружения сбоев и инкапсулирует логику предотвращения постоянного повторения сбоя во время обслуживания, временного сбоя внешней системы или неожиданных системных проблем.

Следующий пример демонстрирует простое использование текущей реализации aiomisc.CircuitBreaker. Экземпляр CircuitBreaker, собирает статистику вызовов функций. Он содержит счетчики которые содержат успешные и неудачные вызовы функции. Вызовы функций должны быть обернуты методом CircuitBreaker.call, чтобы собирать статистику.

Пример использования:

from aiohttp import web, ClientSession
from aiomisc.service.aiohttp import AIOHTTPService
import aiohttp
import aiomisc


async def public_gists(request):
    async with aiohttp.ClientSession() as session:
        # Используем как контекстный менеджер
        with request.app["github_cb"].context():
            url = 'https://api.github.com/gists/public'
            async with session.get(url) as response:
                data = await response.text()

    return web.Response(
        text=data,
        headers={"Content-Type": "application/json"}
    )


class API(AIOHTTPService):
    async def create_application(self):
        app = web.Application()
        app.add_routes([web.get('/', public_gists)])

        # Если ошибок 30% за 20 секунд
        # Сломаемся на 5 секунд
        app["github_cb"] = aiomisc.CircuitBreaker(
            error_ratio=0.2,
            response_time=20,
            exceptions=[aiohttp.ClientError],
            broken_time=5
        )

        return app


async def main():
    async with ClientSession() as session:
        async with session.get("http://localhost:8080/") as response:
            assert response.headers


if __name__ == '__main__':
    with aiomisc.entrypoint(API(port=8080)) as loop:
        loop.run_until_complete(main())

Объект CircuitBreaker может находится в одном из трех состояний:

  • PASSING - аналогия с «ток протекает» (прим. пер.)

  • BROKEN - аналогия со «сгоревшим предохранителем» (прим. пер.)

  • RECOVERING - восстанавливается

_images/cb-states.svg

PASSING означает, что все вызовы будут переданы как есть, но будет собираться статистика. Следующее состояние будет определено после сбора статистики за passing_time секунд. Если эффективный коэффициент ошибок больше чем error_ratio, тогда следующее состояние будет установлено как BROKEN (предохранитель перегорел, прим. пер.), в противном случае оно останется неизменным.

BROKEN означает, что обернутая функция не будет вызываться и вместо нее будет брошено исключение CircuitBroken. Состояние BROKEN будет сохраняться в течение broken_time секунд.

Примечание

Исключение CircuitBroken является следствием состояния BROKEN или RECOVERY и никогда не учитывается в статистике.

После этого предохранитель переходит в состояние RECOVERING. В этом состоянии фактически будет выполняться небольшая выборка обернутых вызовов функции, но будет собираться статистика. Если эффективный коэффициент ошибок после recovery_time ниже, чем error_ratio, то следующее состояние будет установлено в PASSING, в противном случае снова BROKEN.

Аргумент exception_inspector это функция, которая вызывается всякий раз, когда происходит исключение из списка отслеживаемых исключений. Если она возвращает False - исключение будет проигнорировано.

_images/cb-flow.svg

2.11. Декоратор cutout - «рубильник»

Декоратор оборачивающий функцию таким образом, что все вызовы проходят через предохраннитель, а именно через экземпляр CircuitBreaker для этой функции.

from aiohttp import web, ClientSession
from aiomisc.service.aiohttp import AIOHTTPService
import aiohttp
import aiomisc


# Если 20% ошибок за 30 секунд
# Сломаемся на 30 секунд
@aiomisc.cutout(0.2, 30, aiohttp.ClientError)
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def public_gists(request):
    async with aiohttp.ClientSession() as session:
        data = await fetch(
            session,
            'https://api.github.com/gists/public'
        )

    return web.Response(
        text=data,
        headers={"Content-Type": "application/json"}
    )


class API(AIOHTTPService):
    async def create_application(self):
        app = web.Application()
        app.add_routes([web.get('/', public_gists)])
        return app


async def main():
    async with ClientSession() as session:
        async with session.get("http://localhost:8080/") as response:
            assert response.headers


if __name__ == '__main__':
    with aiomisc.entrypoint(API(port=8080)) as loop:
        loop.run_until_complete(main())