2.13. асинхронные операции с файлами

Асинхронные файловые операции, включая поточную компрессию данных. Работают в пуле потоков «под капотом».

import aiomisc
import tempfile
from pathlib import Path


async def file_write():
    with tempfile.TemporaryDirectory() as tmp:
        fname = Path(tmp) / 'test.txt'

        # Некоторые инструменты, такие как mypy, не смогут вывести тип
        # из функции async_open основываясь на переданном символе `b` в режим.
        # Придется тут подсказать тип явно.
        afp: aiomisc.io.AsyncTextIO

        async with aiomisc.io.async_open(fname, 'w+') as afp:
            await afp.write("Hello")
            await afp.write(" ")
            await afp.write("world")

            await afp.seek(0)
            print(await afp.read())


with aiomisc.entrypoint() as loop:
    loop.run_until_complete(file_write())

Этот способ работы с файлами основан на потоках. Это очень похоже на то как сделано в библиотеке aiofiles с теми-же ограничениями.

Разумеется вы можете использовать библиотеку aiofile для этого. Но это не панацея, так как имеет ограничения связанные с API операционной системы.

В основном, для небольших нагрузок приложений, я рекомендую придерживаться следующих правил:

  • Если чтение и запись маленьких или больших кусочков в файлы со случайным доступом основная задача приложения - стоит использовать aiofile.

  • Иначе можно взять этот модуль или aiofiles

  • Если основная задача читать большие куски файлов для дальнейшей их обработки оба вышеописанных метода будут не оптимальны, так как переключения контекста каждую IO операцию - это скорее всего не будет оптимально для файлового кеша и можно потерять бóльшую часть времени исполнения на переключение контекста исполнения. В случае имплементации асинхронного IO на основе потоков цена переключения контектса между потоками может оказаться выше чем суммарно время исполнения всех IO операций.

    Просто попробуйте завернуть все блокирующие вызовы в отдельные функции и вызывайте их используя пул потоков (см. пример ниже):

    import os
    import aiomisc
    import hashlib
    import tempfile
    from pathlib import Path
    
    
    @aiomisc.threaded
    def hash_file(filename, chunk_size=65535, hash_func=hashlib.blake2b):
        hasher = hash_func()
    
        with open(filename, "rb") as fp:
            for chunk in iter(lambda: fp.read(chunk_size), b""):
               hasher.update(chunk)
    
        return hasher.hexdigest()
    
    
    @aiomisc.threaded
    def fill_random_file(filename, size, chunk_size=65535):
        with open(filename, "wb") as fp:
           while fp.tell() < size:
               fp.write(os.urandom(chunk_size))
    
           return fp.tell()
    
    
    async def main(path):
        filename = path / "one"
        await fill_random_file(filename, 1024 * 1024)
        first_hash = await hash_file(filename)
    
        filename = path / "two"
        await fill_random_file(filename, 1024 * 1024)
        second_hash = await hash_file(filename)
    
        assert first_hash != second_hash
    
    
    with tempfile.TemporaryDirectory(prefix="random.") as path:
       aiomisc.run(
           main(Path(path))
       )
    

2.13.1. Поточное сжатие

Чтобы включить поточное сжатие, нужно передать аргумент compression в функцию async_open.

Поддерживаемые алгоритмы сжатия:

  • aiomisc.io.Compression.NONE

  • aiomisc.io.Compression.GZIP

  • aiomisc.io.Compression.BZ2

  • aiomisc.io.Compression.LZMA

Пример использования

import tempfile
from aiomisc import run
from aiomisc.io import async_open, Compression
from pathlib import Path


async def file_write():
    with tempfile.TemporaryDirectory() as tmp:
        fname = Path(tmp) / 'test.txt'

        async with async_open(
            fname, 'w+', compression=Compression.GZIP
        ) as afp:
            for _ in range(10000):
                await afp.write("Hello World\n")

        file_size = fname.stat().st_size
        assert file_size < 10000, f"File too large {file_size} bytes"

run(file_write())