2.14. Работа с потоками#

Можно обернуть блокирующую функцию и запустить ее в отдельном потоке или пуле потоков.

2.14.1. Поддержка contextvars#

Все нижеописанные декоратоы и функции поддерживают модуль contextvars, из PyPI для python моложе 3.7 так и встроенный в стандартную библиотеку модуль для python 3.7.

import asyncio
import aiomisc
import contextvars
import random
import struct


user_id = contextvars.ContextVar("user_id")

record_struct = struct.Struct(">I")


@aiomisc.threaded
def write_user():
    with open("/tmp/audit.bin", 'ab') as fp:
        fp.write(record_struct.pack(user_id.get()))


@aiomisc.threaded
def read_log():
    with open("/tmp/audit.bin", "rb") as fp:
        for chunk in iter(lambda: fp.read(record_struct.size), b''):
            yield record_struct.unpack(chunk)[0]


async def main():
    futures = []
    for _ in range(5):
        user_id.set(random.randint(1, 65535))
        futures.append(write_user())

    await asyncio.gather(*futures)

    async for data in read_log():
        print(data)


if __name__ == '__main__':
    with aiomisc.entrypoint() as loop:
        loop.run_until_complete(main())

Пример вывода

6621
33012
1590
45008
56844

Примечание

contextvars нужны для другого случая нежели класс Context. contextvars хорошо применимы для передачи контекстных переменных сквозь стек вызовов, однако дочерние задачи не смогут модифицировать контекстные переменные родительских задач потому, что contextvars делает их «легкие копии» перед запуском новой задачи. Класс Context наоборот позволяет модификацию отовсюду так как не копирует ничего.

2.14.2. @aiomisc.threaded#

Оборачивает блокирующую функцию и запускает ее в пуле потоков.

import asyncio
import time
from aiomisc import new_event_loop, threaded


@threaded
def blocking_function():
    time.sleep(1)


async def main():
    # Параллельный запуск
    await asyncio.gather(
        blocking_function(),
        blocking_function(),
    )


if __name__ == '__main__':
    loop = new_event_loop()
    loop.run_until_complete(main())

В случае если функция это генератор тогда декоратор @threaded вернет IteratorWrapper (см Threaded generator decorator).

2.14.3. @aiomisc.threaded_separate#

Оборачивает блокирующую функцию и запускает ее в отдельном новом потоке.Крайне рекомендовано для длительных фоновых задач:

import asyncio
import time
import threading
import aiomisc


@aiomisc.threaded
def blocking_function():
    time.sleep(1)


@aiomisc.threaded_separate
def long_blocking_function(event: threading.Event):
    while not event.is_set():
        print("Running")
        time.sleep(1)
    print("Выходим")


async def main():
    stop_event = threading.Event()

    loop = asyncio.get_event_loop()
    loop.call_later(10, stop_event.set)

    # Параллельный запуск
    await asyncio.gather(
        blocking_function(),
        # Будет запущен отдельный новый поток
        long_blocking_function(stop_event),
    )


with aiomisc.entrypoint() as loop:
    loop.run_until_complete(main())

2.14.4. Threaded iterator decorator#

Оборачивает блокирующую функцию-генератор и запусакет ее в текущем пуле потоков или новом отдельном потоке.

Следующий пример читает свой собственный код, и обновляет хеш для каждой следующей строки от предидущих строк, и отправляет все это по TCP:

import asyncio
import hashlib

import aiomisc

# Мой первый блокчейн

@aiomisc.threaded_iterable
def blocking_reader(fname):
    with open(fname, "r+") as fp:
        md5_hash = hashlib.md5()
        for line in fp:
            bytes_line = line.encode()
            md5_hash.update(bytes_line)
            yield bytes_line, md5_hash.hexdigest().encode()


async def main():
    reader, writer = await asyncio.open_connection("127.0.0.1", 2233)
    async with blocking_reader(__file__) as gen:
        async for line, digest in gen:
            writer.write(digest)
            writer.write(b'\t')
            writer.write(line)
            await writer.drain()


with aiomisc.entrypoint() as loop
    loop.run_until_complete(main())

Запустим TCP сервер с помощью netcat в терминале, и после запустим этот пример.

$ netcat -v -l -p 2233
Connection from 127.0.0.1:54734
dc80feba2326979f8976e387fbbc8121   import asyncio
78ec3bcb1c441614ede4af5e5b28f638   import hashlib
b7df4a0a4eac401b2f835447e5fc4139
f0a94eb3d7ad23d96846c8cb5e327454   import aiomisc
0c05dde8ac593bad97235e6ae410cb58
e4d639552b78adea6b7c928c5ebe2b67   # Мой первый блокчейн
5f04aef64f4cacce39170142fe45e53e
c0019130ba5210b15db378caf7e9f1c9   @aiomisc.threaded_iterable
a720db7e706d10f55431a921cdc1cd4c   def blocking_reader(fname):
0895d7ca2984ea23228b7d653d0b38f2       with open(fname, "r+") as fp:
0feca8542916af0b130b2d68ade679cf           md5_hash = hashlib.md5()
4a9ddfea3a0344cadd7a80a8b99ff85c           for line in fp:
f66fa1df3d60b7ac8991244455dff4ee               bytes_line = line.encode()
aaac23a5aa34e0f5c448a8d7e973f036               md5_hash.update(bytes_line)
2040bcaab6137b60e51ae6bd1e279546               yield bytes_line, md5_hash.hexdigest().encode()
7346740fdcde6f07d42ecd2d6841d483
14dfb2bae89fa0d7f9b6cba2b39122c4
d69cc5fe0779f0fa800c6ec0e2a7cbbd   async def main():
ead8ef1571e6b4727dcd9096a3ade4da       reader, writer = await asyncio.open_connection("127.0.0.1", 2233)
275eb71a6b6fb219feaa5dc2391f47b7       async with blocking_reader(__file__) as gen:
110375ba7e8ab3716fd38a6ae8ec8b83           async for line, digest in gen:
c26894b38440dbdc31f77765f014f445               writer.write(digest)
27659596bd880c55e2bc72b331dea948               writer.write(b'\t')
8bb9e27b43a9983c9621c6c5139a822e               writer.write(line)
2659fbe434899fc66153decf126fdb1c               await writer.drain()
6815f69821da8e1fad1d60ac44ef501e
5acc73f7a490dcc3b805e75fb2534254
0f29ad9505d1f5e205b0cbfef572ab0e   if __name__ == '__main__':
8b04db9d80d8cda79c3b9c4640c08928       loop = aiomisc.new_event_loop()
9cc5f29f81e15cb262a46cf96b8788ba       loop.run_until_complete(main())

Придется использовать асинхронный контекст-менеджер в случае если генератор бесконечный или придется явно вызвать и дождаться метода .close() если вы избегаете использование асинхронных контекст-менеджеров.

import asyncio
import aiomisc


# Настраиваем буфер на 2 элемента вперед
@aiomisc.threaded_iterable(max_size=2)
def urandom_reader():
    with open('/dev/urandom', "rb") as fp:
        while True:
            yield fp.read(8)


# Бесконечный буфер в отдельном потоке
@aiomisc.threaded_iterable_separate
def blocking_reader(fname):
    with open(fname, "r") as fp:
        yield from fp


async def main():
    reader, writer = await asyncio.open_connection("127.0.0.1", 2233)
    async for line in blocking_reader(__file__):
        writer.write(line.encode())

    await writer.drain()

    # Будем "кушать" белый шум
    gen = urandom_reader()
    counter = 0
    async for line in gen:
        writer.write(line)
        counter += 1

        if counter == 10:
            break

    await writer.drain()

    # Останавливаем запущенный генератор
    await gen.close()

    # Тоже самое, только через контекст менеджер
    async with urandom_reader() as gen:
        counter = 0
        async for line in gen:
            writer.write(line)
            counter += 1

            if counter == 10:
                break

    await writer.drain()


with aiomisc.entrypoint() as loop:
    loop.run_until_complete(main())

2.14.5. aiomisc.IteratorWrapper#

Запускает блокирующий итератор в пуле потоков:

import concurrent.futures
import hashlib
import aiomisc


def urandom_reader():
    with open('/dev/urandom', "rb") as fp:
        while True:
            yield fp.read(1024)


async def main():
    # Создаем новый пул потоков
    pool = concurrent.futures.ThreadPoolExecutor(1)
    wrapper = aiomisc.IteratorWrapper(
        urandom_reader,
        executor=pool,
        max_size=2
    )

    async with wrapper as gen:
        md5_hash = hashlib.md5(b'')
        counter = 0
        async for item in gen:
            md5_hash.update(item)
            counter += 1

            if counter >= 100:
                break

    pool.shutdown()
    print(md5_hash.hexdigest())


if __name__ == '__main__':
    with aiomisc.entrypoint() as loop:
        loop.run_until_complete(main())

2.14.6. aiomisc.IteratorWrapperSeparate#

Запускает блокирующий итераторы в отдельном новом потоке:

import concurrent.futures
import hashlib
import aiomisc


def urandom_reader():
    with open('/dev/urandom', "rb") as fp:
        while True:
            yield fp.read(1024)


async def main():
    # Создаем новый пул потоков
    wrapper = aiomisc.IteratorWrapperSeparate(
        urandom_reader, max_size=2
    )

    async with wrapper as gen:
        md5_hash = hashlib.md5(b'')
        counter = 0
        async for item in gen:
            md5_hash.update(item)
            counter += 1

            if counter >= 100:
                break

    print(md5_hash.hexdigest())


if __name__ == '__main__':
    with aiomisc.entrypoint() as loop:
        loop.run_until_complete(main())

2.14.7. aiomisc.ThreadPoolExecutor#

Реализация быстрого и простого пула потоков.

Устанавливаем как пул потоков по умолчанию:

import asyncio
from aiomisc import ThreadPoolExecutor

loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor(4)
loop.set_default_executor(thread_pool)

Примечание

entrypoint установит это по умолчанию.

Аргумент pool_size в entrypoint ограничивает кол-во потоков п пуле.

2.14.8. aiomisc.sync_wait_coroutine#

Функции запущенные в потоке не могут вызывать и дожидаться сопрограмм по умолчанию. Эта функция это хелпер позволяет отправить сопрограмму в event loop и дождаться ее результата из текущего потока.

import asyncio
import aiomisc


async def coro():
    print("Coroutine started")
    await asyncio.sleep(1)
    print("Coroutine done")


@aiomisc.threaded
def in_thread(loop):
    print("Thread started")
    aiomisc.sync_wait_coroutine(loop, coro)
    print("Thread finished")


with aiomisc.entrypoint() as loop:
    loop.run_until_complete(in_thread(loop))