Source code for flutes.io

import contextlib
import io
import os
import sys
from typing import Any, AnyStr, Callable, Dict, IO, Optional, overload

from tqdm import tqdm
from typing_extensions import Literal

from .types import PathType

__all__ = [
    "shut_up",
    "progress_open",
    "reverse_open",
]


[docs]@contextlib.contextmanager def shut_up(stderr: bool = True, stdout: bool = False): r""" Suppress output (probably generated by external script or badly-written libraries) for :py:data:`~sys.stderr` or :py:data:`~sys.stdout`. This method can be used as a decorator, or a context manager: .. code:: python @shut_up(stderr=True) def verbose_func(...): ... with shut_up(stderr=True): ... # verbose stuff :param stderr: If ``True``, suppress output from :py:data:`~sys.stderr`. Defaults to ``True``. :param stdout: If ``True``, suppress output from :py:data:`~sys.stdout`. Defaults to ``False``. """ # redirect output to /dev/null fds = ([sys.stdout.fileno()] if stdout else []) + ([sys.stderr.fileno()] if stderr else []) null_fds = [os.open(os.devnull, os.O_RDWR) for _ in fds] output_fds = [os.dup(fd) for fd in fds] for null_fd, fd in zip(null_fds, fds): os.dup2(null_fd, fd) yield # restore normal stderr for null_fd, output_fd, fd in zip(null_fds, output_fds, fds): os.dup2(output_fd, fd) os.close(null_fd)
class _ProgressBufferedReader(io.BufferedReader, IO[bytes]): def __init__(self, raw: io.RawIOBase, buffer_size: int = io.DEFAULT_BUFFER_SIZE, *, bar_fn: Callable[..., tqdm], bar_kwargs: Dict[str, Any]): super().__init__(raw, buffer_size) file_size = os.fstat(raw.fileno()).st_size self._read_bytes = 0 self.progress_bar = bar_fn(total=file_size, **bar_kwargs) def __enter__(self): self.progress_bar.__enter__() return super().__enter__() def __exit__(self, exc_type, exc_val, exc_tb): if super().__exit__(exc_type, exc_val, exc_tb): return True return self.progress_bar.__exit__(exc_type, exc_val, exc_tb) def close(self) -> None: self.progress_bar.close() def read(self, size: Optional[int] = -1) -> bytes: ret = super().read(size) self._read_bytes += len(ret) self.progress_bar.update(len(ret)) return ret def read1(self, size: int = -1) -> bytes: ret = super().read1(size) self._read_bytes += len(ret) self.progress_bar.update(len(ret)) return ret def readline(self, size: int = -1) -> bytes: ret = super().readline(size) self._read_bytes += len(ret) self.progress_bar.update(len(ret)) return ret def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: ret = super().seek(offset, whence) self.progress_bar.update(ret - self._read_bytes) self._read_bytes = ret return ret class ProgressReader(IO[AnyStr]): # stub for `progress_open` progress_bar: tqdm @overload def progress_open(path: PathType, mode: Literal['r'] = 'r', *, encoding: str = ..., verbose: bool = ..., buffer_size: int = ..., **kwargs) -> ProgressReader[str]: ... @overload def progress_open(path: PathType, mode: Literal['rb'], *, encoding: str = ..., verbose: bool = ..., buffer_size: int = ..., **kwargs) -> ProgressReader[bytes]: ... @overload def progress_open(path: PathType, mode: str, *, encoding: str = ..., verbose: bool = ..., buffer_size: int = ..., **kwargs) -> ProgressReader[Any]: ...
[docs]def progress_open(path, mode="r", *, encoding='utf-8', verbose=True, buffer_size=io.DEFAULT_BUFFER_SIZE, bar_fn: Optional[Callable[..., tqdm]] = None, **kwargs): r"""A replacement for :py:func:`open` that shows the progress of reading the file: .. code:: python with progress_open(path, mode="r") as f: # `f` is just what you'd get with `open(path)`, now with a progress bar bar = f.progress_bar # type: tqdm.tqdm :param path: Path to the file. :param mode: The file open mode. When progress bar is enabled, only read modes ``"r"`` and ``"rb"`` are supported (write progress doesn't make a lot of sense). Defaults to ``"r"``. :param encoding: Encoding for the file. Only required for ``"r"`` mode. Defaults to ``"utf-8"``. :param verbose: If ``False``, the progress bar is not displayed and a normal file object is returned. Defaults to ``True``. :param buffer_size: The size of the file buffer. Defaults to :py:data:`io.DEFAULT_BUFFER_SIZE`. :param bar_fn: An optional callable that constructs a progress bar when called. This is useful when you want to override the default progress bar, for instance, to use with :class:`~flutes.ProgressBarManager`: .. code:: python def process(path: str, bar: flutes.ProgressBarManager.Proxy): with flutes.progress_open(path, bar_fn=bar.new) as f: ... :param kwargs: Additional arguments to pass to `tqdm <https://tqdm.github.io/>`_ initializer. :return: A file object. """ if not verbose: return open(path, mode) if mode not in ["r", "rb"]: raise ValueError(f"Unsupported mode '{mode}'. Only read modes ('r', 'rb') are supported") kwargs.setdefault("bar_format", "{l_bar}{bar}| [{elapsed}<{remaining}{postfix}]") buffer = f = _ProgressBufferedReader(io.FileIO(str(path), mode="r"), buffer_size, bar_fn=bar_fn or tqdm, bar_kwargs=kwargs) if mode == "r": f = io.TextIOWrapper(f, encoding=encoding) # type: ignore[assignment] f.progress_bar = buffer.progress_bar return f
class _ReverseReadlineFile: MAX_CHAR_BYTES = 4 # Maximum length of byte sequences for any character in target encoding @staticmethod def generator(fp, *, encoding='utf-8', allow_empty_lines=False, buf_size=8192): segment = None offset = 0 fp.seek(0, os.SEEK_END) file_size = remaining_size = fp.tell() while remaining_size > 0: cur_buf_size = buf_size offset = min(file_size, offset + cur_buf_size) fp.seek(file_size - offset) buffer_bytes = fp.read(min(remaining_size, cur_buf_size)) trials = 0 while True: trials += 1 try: buffer = buffer_bytes.decode(encoding) break except UnicodeDecodeError: if trials >= _ReverseReadlineFile.MAX_CHAR_BYTES: raise buffer_bytes = buffer_bytes[1:] cur_buf_size -= 1 offset -= 1 fp.seek(file_size - offset) remaining_size -= cur_buf_size lines = buffer.split('\n') # the first line of the buffer is probably not a complete line so # we'll save it and append it to the last line of the next buffer # we read if segment is not None: # if the previous chunk starts right from the beginning of line # do not concat the segment to the last line of new chunk # instead, yield the segment first if buffer[-1] != '\n': lines[-1] += segment else: yield segment segment = lines[0] for index in range(len(lines) - 1, 0, -1): if allow_empty_lines or len(lines[index]): yield lines[index] # Don't yield None if the file was empty if segment is not None: yield segment def __init__(self, fp: IO, gen): self.fp = fp self.gen = gen def __iter__(self): return self def __next__(self): return next(self.gen) + '\n' def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def readline(self): return next(self.gen) def close(self): self.fp.close()
[docs]def reverse_open(path: PathType, *, encoding: str = 'utf-8', allow_empty_lines: bool = False, buffer_size: int = io.DEFAULT_BUFFER_SIZE): # Credits: https://stackoverflow.com/questions/2301789/read-a-file-in-reverse-order-using-python r"""A generator that returns the lines of a file in reverse order. Usage and syntax is the same as built-in method :py:func:`open`. :param path: Path to file. :param encoding: Encoding of file. Defaults to ``"utf-8"``. :param allow_empty_lines: If ``False``, empty lines are skipped. Defaults to ``False``. :param buffer_size: Buffer size. You probably won't need to change this for most cases. Defaults to :py:data:`io.DEFAULT_BUFFER_SIZE`. """ if buffer_size < _ReverseReadlineFile.MAX_CHAR_BYTES: raise ValueError(f"`buf_size` must be at least {_ReverseReadlineFile.MAX_CHAR_BYTES}") fp = open(path, "rb") gen = _ReverseReadlineFile.generator(fp, encoding=encoding, allow_empty_lines=allow_empty_lines, buf_size=buffer_size) return _ReverseReadlineFile(fp, gen)