2.13. асинхронные операции с файлами¶
Асинхронные файловые операции, включая поточную компрессию данных. Работают в пуле потоков «под капотом».
import aiomisc
import tempfile
from pathlib import Path
async def file_write():
with tempfile.TemporaryDirectory() as tmp:
fname = Path(tmp) / 'test.txt'
# Некоторые инструменты, такие как mypy, не смогут вывести тип
# из функции async_open основываясь на переданном символе `b` в режим.
# Придется тут подсказать тип явно.
afp: aiomisc.io.AsyncTextIO
async with aiomisc.io.async_open(fname, 'w+') as afp:
await afp.write("Hello")
await afp.write(" ")
await afp.write("world")
await afp.seek(0)
print(await afp.read())
with aiomisc.entrypoint() as loop:
loop.run_until_complete(file_write())
Этот способ работы с файлами основан на потоках. Это очень похоже на то как сделано в библиотеке aiofiles с теми-же ограничениями.
Разумеется вы можете использовать библиотеку aiofile для этого. Но это не панацея, так как имеет ограничения связанные с API операционной системы.
В основном, для небольших нагрузок приложений, я рекомендую придерживаться следующих правил:
Если чтение и запись маленьких или больших кусочков в файлы со случайным доступом основная задача приложения - стоит использовать aiofile.
Иначе можно взять этот модуль или aiofiles
Если основная задача читать большие куски файлов для дальнейшей их обработки оба вышеописанных метода будут не оптимальны, так как переключения контекста каждую IO операцию - это скорее всего не будет оптимально для файлового кеша и можно потерять бóльшую часть времени исполнения на переключение контекста исполнения. В случае имплементации асинхронного IO на основе потоков цена переключения контектса между потоками может оказаться выше чем суммарно время исполнения всех IO операций.
Просто попробуйте завернуть все блокирующие вызовы в отдельные функции и вызывайте их используя пул потоков (см. пример ниже):
import os import aiomisc import hashlib import tempfile from pathlib import Path @aiomisc.threaded def hash_file(filename, chunk_size=65535, hash_func=hashlib.blake2b): hasher = hash_func() with open(filename, "rb") as fp: for chunk in iter(lambda: fp.read(chunk_size), b""): hasher.update(chunk) return hasher.hexdigest() @aiomisc.threaded def fill_random_file(filename, size, chunk_size=65535): with open(filename, "wb") as fp: while fp.tell() < size: fp.write(os.urandom(chunk_size)) return fp.tell() async def main(path): filename = path / "one" await fill_random_file(filename, 1024 * 1024) first_hash = await hash_file(filename) filename = path / "two" await fill_random_file(filename, 1024 * 1024) second_hash = await hash_file(filename) assert first_hash != second_hash with tempfile.TemporaryDirectory(prefix="random.") as path: aiomisc.run( main(Path(path)) )
2.13.1. Поточное сжатие¶
Чтобы включить поточное сжатие, нужно передать аргумент compression в функцию async_open.
Поддерживаемые алгоритмы сжатия:
aiomisc.io.Compression.NONE
aiomisc.io.Compression.GZIP
aiomisc.io.Compression.BZ2
aiomisc.io.Compression.LZMA
Пример использования
import tempfile
from aiomisc import run
from aiomisc.io import async_open, Compression
from pathlib import Path
async def file_write():
with tempfile.TemporaryDirectory() as tmp:
fname = Path(tmp) / 'test.txt'
async with async_open(
fname, 'w+', compression=Compression.GZIP
) as afp:
for _ in range(10000):
await afp.write("Hello World\n")
file_size = fname.stat().st_size
assert file_size < 10000, f"File too large {file_size} bytes"
run(file_write())