2.18. Pytest plugin

This package contains a plugin for pytest.

2.18.1. Basic usage

Simple usage example:

import asyncio
import pytest

async def test_sample(loop):
    f = loop.crete_future()
    loop.call_soon(f.set_result, True)

    assert await f

asynchronous fixture example:

import asyncio
import pytest

async def my_fixture(loop):
    await asyncio.sleep(0)

    # Requires python 3.6+

In case you have to save an instance of an async fixture between tests, the wrong solution is just changing the fixture scope. But why it wouldn’t work? That’s because, in the base scenario, the loop fixture creates a new event loop instance per test which will be closed after test teardown. When you have to use an async fixture any caller of asyncio.get_event_loop() will get the current event loop instance which will be closed and the next test will run in another event loop. So the solution is to redefine the loop fixture with the required scope and custom fixture with the required scope.

import asyncio
import pytest
from aiomisc import entrypoint

def loop():
    with entrypoint() as loop:
        yield loop

async def sample_fixture(loop):
    yield 1

LOOP_ID = None

async def test_using_fixture(sample_fixture):
    global LOOP_ID
    LOOP_ID = id(asyncio.get_event_loop())
    assert sample_fixture == 1

async def test_not_using_fixture(loop):
    assert id(loop) == LOOP_ID

2.18.2. pytest markers

Package contains some useful markers for pytest:

  • catch_loop_exceptions - uncaught event loop exceptions will failling test.

  • forbid_get_event_loop - forbids call asyncio.get_event_loop during test case.

import pytest

# Test will be failed
async def test_with_get_loop():
    def switch_context():
        loop = get_event_loop()
        future = loop.create_future()
        loop.call_soon(future.set_result, True)
        return future

    with pytest.raises(Failed):
        await switch_context()

# Test will be failed
async def test_with_errors(loop):
    async def fail():
        # switch context
        await asyncio.sleep(0)
        raise Exception()

    await asyncio.sleep(0.1)

2.18.3. Passing default context

import pytest

def default_context():
    return {
        'foo': 'bar',
        'bar': 'foo',

2.18.4. Testing services

Redefine services fixture in your test module:

def services(aiomisc_unused_port, handlers):
    return [
            handlers={'foo': lambda: 'bar'},

2.18.5. Event loop policy overriding

import uvloop
import tokio

policy_ids = ('uvloop', 'asyncio', 'tokio')
policies = (uvloop.EventLoopPolicy(),

@pytest.fixture(params=policies, ids=policy_ids)
def event_loop_policy(request):
    return request.param

2.18.6. Thread pool overriding

thread_pool_ids = ('aiomisc pool', 'default pool')
thread_pool_implementation = (ThreadPoolExecutor,

@pytest.fixture(params=thread_pool_implementation, ids=thread_pool_ids)
def thread_pool_executor(request):
    return request.param

2.18.7. entrypoint arguments

import pytest

def entrypoint_kwargs() -> dict:
    return dict(log_config=False)

2.18.8. aiohttp test client

import pytest
from myapp.services.rest import REST

def rest_port(aiomisc_unused_port_factory):
    return aiomisc_unused_port_factory()

def rest_service(rest_port):
    return REST(port=rest_port)

def services(rest_service):
    return [rest_service]

def api_client(api_service):
    test_srv = TestServer(

    return TestClient(test_srv)


2.18.9. TCPProxy

Simple TCP proxy for emulate network problems. Available as fixture tcp_proxy


import asyncio
import time

import pytest

import aiomisc

class EchoServer(aiomisc.service.TCPServer):
    async def handle_client(
            self, reader: asyncio.StreamReader,
            writer: asyncio.StreamWriter
        chunk = await reader.read(65534)
        while chunk:
            chunk = await reader.read(65534)

        await writer.wait_closed()

def server_port(aiomisc_unused_port_factory) -> int:
    return aiomisc_unused_port_factory()

def services(server_port, localhost):
    return [EchoServer(port=server_port, address=localhost)]

async def proxy(tcp_proxy, localhost, server_port):
    async with tcp_proxy(localhost, server_port) as proxy:
        yield proxy

async def test_proxy_client_close(proxy):
    reader, writer = await proxy.create_client()
    payload = b"Hello world"

    response = await asyncio.wait_for(reader.read(1024), timeout=1)

    assert response == payload

    assert not reader.at_eof()
    await proxy.disconnect_all()

    assert await asyncio.wait_for(reader.read(), timeout=1) == b""
    assert reader.at_eof()

async def test_proxy_client_slow(proxy):
    read_delay = 0.1
    write_delay = 0.2

    # Emulation of asymmetric and slow ISP
    with proxy.slowdown(read_delay, write_delay):
        reader, writer = await proxy.create_client()
        payload = b"Hello world"

        delta = -time.monotonic()

        await asyncio.wait_for(reader.read(1024), timeout=2)

        delta += time.monotonic()

        assert delta >= read_delay + write_delay

async def test_proxy_client_with_processor(proxy):
    processed_request = b"Never say hello"

    # Patching protocol functions
        # Process data from client to server
        lambda _: processed_request,

        # Process data from server to client
        lambda chunk: chunk[::-1],

    reader, writer = await proxy.create_client()

    response = await reader.read(16)

    assert response == processed_request[::-1]