Source code for flutes.fs

import functools
import os
import pickle
import platform
import shutil
import subprocess
from pathlib import Path
from typing import Iterator, Optional

from .log import log
from .types import PathType

__all__ = [
    "get_folder_size",
    "readable_size",
    "get_file_lines",
    "remove_prefix",
    "copy_tree",
    "cache",
    "scandir",
]

if platform.system() == "Darwin":
    def get_folder_size(path: PathType) -> int:
        # Credit: https://stackoverflow.com/a/25574638/4909228
        r"""Get disk usage of given path in bytes."""
        return int(subprocess.check_output(['du', '-s', str(path)],
                                           env={"BLOCKSIZE": "512"}).split()[0].decode('utf-8')) * 512
else:
[docs] def get_folder_size(path: PathType) -> int: # Credit: https://stackoverflow.com/a/25574638/4909228 r"""Get disk usage of given path in bytes.""" return int(subprocess.check_output(['du', '-bs', str(path)]).split()[0].decode('utf-8'))
[docs]def readable_size(size: float) -> str: r"""Represent file size in human-readable format. .. code:: python >>> readable_size(1024 * 1024) "1.00M" :param size: File size in bytes. """ units = ["", "K", "M", "G", "T", "P"] for unit in units[:-1]: if size < 1024: return f"{size:.2f}{unit}" size /= 1024 return f"{size:.2f}{units[-1]}"
[docs]def get_file_lines(path: PathType) -> int: r"""Get number of lines in text file. """ return int(subprocess.check_output(['wc', '-l', str(path)]).split()[0].decode('utf-8'))
[docs]def remove_prefix(s: str, prefix: str) -> str: r"""Remove the specified prefix from a string. If only parts of the prefix match, then only that part is removed. .. code:: python >>> remove_prefix("https://github.com/huzecong/flutes", "https://") "github.com/huzecong/flutes" >>> remove_prefix("preface", "prefix") "face" :param s: The string whose prefix we want to remove. :param prefix: The prefix to remove. """ length = min(len(s), len(prefix)) prefix_len = next((idx for idx in range(length) if s[idx] != prefix[idx]), length) return s[prefix_len:]
[docs]def copy_tree(src: PathType, dst: PathType, overwrite: bool = False) -> None: r"""Copy contents of folder ``src`` to folder ``dst``. The ``dst`` folder can exist or whatever (looking at you, :py:func:`shutil.copytree`). :param src: The source directory. :param dst: The destination directory. If it doesn't exist, it will be created. :param overwrite: If ``True``, files in ``dst`` will be overwritten if a file with the same relative path exists in ``src``. If ``False``, these files are not copied. Defaults to ``False``. """ os.makedirs(dst, exist_ok=True) for file in os.listdir(src): src_path = os.path.join(src, file) dst_path = os.path.join(dst, file) if os.path.isdir(src_path): copy_tree(src_path, dst_path, overwrite=overwrite) elif overwrite or not os.path.exists(dst_path): shutil.copy2(src_path, dst_path) shutil.copystat(src, dst)
[docs]def cache(path: Optional[PathType], verbose: bool = True, name: Optional[str] = None): r"""A function decorator that caches the output of the function to disk. If the cache file exists, it is loaded from disk and the function will not be executed. :param path: Path to the cache file. If ``None``, no cache is loaded or saved. :param verbose: If ``True``, will print to log. :param name: Name of the object to load. Only used for logging purposes. """ name = (name or 'cache').capitalize() def decorator(func): @functools.wraps(func) def wrapped(*args, **kwargs): if path is not None and os.path.exists(path): with open(path, "rb") as f: ret = pickle.load(f) if verbose: log(f"{name} loaded from '{path}'") else: ret = func(*args, **kwargs) if path is not None: with open(path, "wb") as f: pickle.dump(ret, f) if verbose: log(f"{name} saved to '{path}'") return ret return wrapped return decorator
[docs]def scandir(path: PathType) -> Iterator[PathType]: r"""Lazily iterate over all files and directories under a directory. The returned path is the absolute path of the child file or directory, with the same type as :attr:`path` (:py:class:`pathlib.Path` or :py:class:`str`). :param path: Path to the directory. :return: An iterator over children paths. """ if isinstance(path, Path): with os.scandir(path) as it: for entry in it: yield Path(entry.path) else: with os.scandir(path) as it: for entry in it: yield entry.path