asynchronous file operations

Asynchronous files operations. Based on the thread pool under the hood.

import aiomisc
import tempfile
from pathlib import Path


async def file_write():
    with tempfile.TemporaryDirectory() as tmp:
        fname = Path(tmp) / 'test.txt'

        async with aiomisc.io.async_open(fname, 'w+') as afp:
            await afp.write("Hello")
            await afp.write(" ")
            await afp.write("world")

            await afp.seek(0)
            print(await afp.read())


with aiomisc.entrypoint() as loop:
    loop.run_until_complete(file_write())

This is the way to working with files based on threads. It’s very similar to aiofiles project and same limitations.

Of course, you can use aiofile project for this. But it’s not a silver bullet and has OS API-related limitations.

In general, for light loads, I would advise you to adhere to the following rules:

  • If reading and writing small or big chunks from files with random access is the main task in your project, use aiofile.

  • Otherwise use this module or aiofiles

  • If the main task is to read large chunks of files for processing, both of the above methods are not optimal cause you will switch context each IO operation, it’s often suboptimal for file cache and you will be lost execution time for context switches. In case for thread-based IO executor implementation thread context switches cost might be more expensive than IO operation time in summary.

    Just try pack all blocking staff in separate functions and call it in a thread pool, see the example bellow:

    import os
    import aiomisc
    import hashlib
    import tempfile
    from pathlib import Path
    
    
    @aiomisc.threaded
    def hash_file(filename, chunk_size=65535, hash_func=hashlib.blake2b):
        hasher = hash_func()
    
        with open(filename, "rb") as fp:
            for chunk in iter(lambda: fp.read(chunk_size), b""):
               hasher.update(chunk)
    
        return hasher.hexdigest()
    
    
    @aiomisc.threaded
    def fill_random_file(filename, size, chunk_size=65535):
        with open(filename, "wb") as fp:
           while fp.tell() < size:
               fp.write(os.urandom(chunk_size))
    
           return fp.tell()
    
    
    async def main(path):
        filename = path / "one"
        await fill_random_file(filename, 1024 * 1024)
        first_hash = await hash_file(filename)
    
        filename = path / "two"
        await fill_random_file(filename, 1024 * 1024)
        second_hash = await hash_file(filename)
    
        assert first_hash != second_hash
    
    
    with tempfile.TemporaryDirectory(prefix="random.") as path:
       aiomisc.run(
           main(Path(path))
       )