2.17. WorkerPool
¶
В Python есть модуль multiprocessing
в котором реализован класс Pool
, это аналог этого модуля, за единственным исключением - IPC в этом случае полностью синхронный. Этот модуль реализует Worker Pool на основе процессов, но IPC, при этом, полностью асинхронный на вызывающей стороне, при этом рабочие процессы не асинхронны.
2.17.1. Пример¶
Это полезно, когда вы хотите обрабатывать данные в отдельном процессе, при этом входные и выходные данные не велики. В противном случае это, конечно, будет работать нормально, но вам придется тратить время на передачу данных по IPC.
Хорошим примером является параллельная обработка изображений. Конечно, вы можете передавать байты изображений через IPC рабочего пула, но в общем случае передача имени файла будет лучше. Исключением будут случаи, когда изображение очень маленькое меньше, к примеру 1 КБ.
Давайте напишем программу, которая принимает изображения в формате JPEG и создает миниатюры. В этом случае у вас есть файл с исходным изображением, и вы должны сгенерировать выходной путь для функции «thumbnail».
Примечание
Придется установить Pillow - библиотеку для работы с изображениями, чтобы запустить этот код.
Установка через pip:
pip install Pillow
import asyncio
import sys
from multiprocessing import cpu_count
from typing import Tuple
from pathlib import Path
from PIL import Image
from aiomisc import entrypoint, WorkerPool
def thumbnail(src_path: str, dest_path: str, box: Tuple[int, int]):
img = Image.open(src_path)
img.thumbnail(box)
img.save(
dest_path, "JPEG", quality=65,
optimize=True,
icc_profile=img.info.get('icc_profile'),
exif=img.info.get('exif'),
)
return img.size
sizes = [
(1024, 1024),
(512, 512),
(256, 256),
(128, 128),
(64, 64),
(32, 32),
]
async def amain(path: Path):
# Создаем директории
for size in sizes:
size_dir = "x".join(map(str, size))
size_path = (path / 'thumbnails' / size_dir)
size_path.mkdir(parents=True, exist_ok=True)
# Создаем и запускаем WorkerPool
async with WorkerPool(cpu_count()) as pool:
tasks = []
for image in path.iterdir():
if not image.name.endswith(".jpg"):
continue
if image.is_relative_to(path / 'thumbnails'):
continue
for size in sizes:
rel_path = image.relative_to(path).parent
size_dir = "x".join(map(str, size))
dest_path = (
path / rel_path /
'thumbnails' / size_dir /
image.name
)
tasks.append(
pool.create_task(
thumbnail,
str(image),
str(dest_path),
size
)
)
await asyncio.gather(*tasks)
if __name__ == '__main__':
with entrypoint() as loop:
image_dir = Path(sys.argv[1])
loop.run_until_complete(amain(image_dir))
В этом примере каталог изображений используется в качестве первого аргумента командной строки и создает каталоги для эскизов. После этого запускается WorkerPool
с таким количеством процессов, сколько ядер у процессора.
Главный процесс создает задачи для рабочих, каждая задача это конвертация одного изображения в один размер, и все эти задачи передаются в WorkerPool
WorkerPool
обрабатывает задачи конкурентно, но не более одной задачи на одного рабочего в один момент времени