Worker Pool

В Python есть модуль multiprocessing в котором реализован класс Pool, это аналог этого модуля, за единственным исключением - IPC в этом случае полностью синхронный. Этот модуль реализует Worker Pool на основе процессов, но IPC, при этом, полностью асинхронный на вызывающей стороне, при этом рабочие процессы не асинхронны.

Пример

Это полезно, когда вы хотите обрабатывать данные в отдельном процессе, при этом входные и выходные данные не велики. В противном случае это, конечно, будет работать нормально, но вам придется тратить время на передачу данных по IPC.

Хорошим примером является параллельная обработка изображений. Конечно, вы можете передавать байты изображений через IPC рабочего пула, но в общем случае передача имени файла будет лучше. Исключением будут случаи, когда изображение очень маленькое меньше, к примеру 1 КБ.

Давайте напишем программу, которая принимает изображения в формате JPEG и создает миниатюры. В этом случае у вас есть файл с исходным изображением, и вы должны сгенерировать выходной путь для функции «thumbnail».

Примечание

Придется установить Pillow - библиотеку для работы с изображениями, чтобы запустить этот код.

Установка через pip:

pip install Pillow
import asyncio
import sys
from multiprocessing import cpu_count
from typing import Tuple
from pathlib import Path
from PIL import Image
from aiomisc import entrypoint, WorkerPool


def thumbnail(src_path: str, dest_path: str, box: Tuple[int, int]):
    img = Image.open(src_path)
    img.thumbnail(box)
    img.save(
        dest_path, "JPEG", quality=65,
        optimize=True,
        icc_profile=img.info.get('icc_profile'),
        exif=img.info.get('exif'),
    )
    return img.size


sizes = [
    (1024, 1024),
    (512, 512),
    (256, 256),
    (128, 128),
    (64, 64),
    (32, 32),
]


async def amain(path: Path):
    # Create directories
    for size in sizes:
        size_dir = "x".join(map(str, size))
        size_path = (path / 'thumbnails' / size_dir)
        size_path.mkdir(parents=True, exist_ok=True)

    # Create and run WorkerPool
    async with WorkerPool(cpu_count()) as pool:
        tasks = []
        for image in path.iterdir():
            if not image.name.endswith(".jpg"):
                continue

            if image.is_relative_to(path / 'thumbnails'):
                continue

            for size in sizes:
                rel_path = image.relative_to(path).parent
                size_dir = "x".join(map(str, size))
                dest_path = (
                    path / rel_path /
                    'thumbnails' / size_dir /
                    image.name
                )

                tasks.append(
                    pool.create_task(
                        thumbnail,
                        str(image),
                        str(dest_path),
                        size
                    )
                )

        await asyncio.gather(*tasks)


if __name__ == '__main__':
    with entrypoint() as loop:
        image_dir = Path(sys.argv[1])
        loop.run_until_complete(amain(image_dir))

В этом примере каталог изображений используется в качестве первого аргумента командной строки и создает каталоги для эскизов. После этого запускается WorkerPool с таким количеством процессов, сколько ядер у процессора.

Главный процесс создает задачи для рабочих, каждая задача это конвертация одного изображения в один размер, и все эти задачи передаются в WorkerPool

WorkerPool обрабатывает задачи конкурентно, но не более одной задачи на одного рабочего в один момент времени