API Documentation

Exception

flutes.exception.register_ipython_excepthook() → None[source]

Register an exception hook that launches an interactive IPython session upon uncaught exceptions.

flutes.exception.log_exception(e, user_msg: Optional[str] = None, **kwargs)[source]

Convenience function to log an exception using the logging interface.

Parameters:
  • e – The exception instance.
  • user_msg – An optional user message to print.
  • kwargs – Additional arguments for log().
flutes.exception.exception_wrapper(handler_fn=None)[source]

Function decorator that calls the specified handler function when a exception occurs inside the decorated function. By default, handler_fn is None, and log_exception() will be called to print the exception details.

A custom handler function takes the following arguments:

  • A positional argument for the exception object. This must be the first argument of the method.
  • Arguments with matching names to arguments in the wrapped method. These arguments will be filled with values passed to the wrapped method. These arguments cannot take default values.
  • Arguments without matching names. These arguments must take default values.
  • An optional variadic keyword argument (**kwargs). This will be filled with remaining argument name-value pairs that are not captured by other arguments.

For example:

def handler_fn(e, three, one, args, my_arg=None, **kw): ...

@exception_wrapper(handler_fn)
def foo(one, two, *args, three=None, **kwargs): ...

foo(1, "2", "arg1", "arg2", four=4)

Assuming a ValueError is thrown, the argument values for handler_fn would be:

e:      <ValueError>
three:  None
one:    1
args:   ("arg1", "arg2")
my_arg: None
kw:     {"two": "2",
         "kwargs": {"four": 4}}

File System

flutes.fs.get_folder_size(path: PathType) → int[source]

Get disk usage of given path in bytes.

flutes.fs.readable_size(size: float) → str[source]

Represent file size in human-readable format.

>>> readable_size(1024 * 1024)
"1.00M"
Parameters:size – File size in bytes.
flutes.fs.get_file_lines(path: PathType) → int[source]

Get number of lines in text file.

flutes.fs.remove_prefix(s: str, prefix: str) → str[source]

Remove the specified prefix from a string. If only parts of the prefix match, then only that part is removed.

>>> remove_prefix("https://github.com/huzecong/flutes", "https://")
"github.com/huzecong/flutes"

>>> remove_prefix("preface", "prefix")
"face"
Parameters:
  • s – The string whose prefix we want to remove.
  • prefix – The prefix to remove.
flutes.fs.copy_tree(src: PathType, dst: PathType, overwrite: bool = False) → None[source]

Copy contents of folder src to folder dst. The dst folder can exist or whatever (looking at you, shutil.copytree()).

Parameters:
  • src – The source directory.
  • dst – The destination directory. If it doesn’t exist, it will be created.
  • overwrite – If True, files in dst will be overwritten if a file with the same relative path exists in src. If False, these files are not copied. Defaults to False.
flutes.fs.cache(path: Optional[PathType], verbose: bool = True, name: Optional[str] = None)[source]

A function decorator that caches the output of the function to disk. If the cache file exists, it is loaded from disk and the function will not be executed.

Parameters:
  • path – Path to the cache file. If None, no cache is loaded or saved.
  • verbose – If True, will print to log.
  • name – Name of the object to load. Only used for logging purposes.
flutes.fs.scandir(path: PathType) → Iterator[PathType][source]

Lazily iterate over all files and directories under a directory. The returned path is the absolute path of the child file or directory, with the same type as path (pathlib.Path or str).

Parameters:path – Path to the directory.
Returns:An iterator over children paths.

I/O

flutes.io.shut_up(stderr: bool = True, stdout: bool = False)[source]

Suppress output (probably generated by external script or badly-written libraries) for stderr or stdout. This method can be used as a decorator, or a context manager:

@shut_up(stderr=True)
def verbose_func(...):
    ...

with shut_up(stderr=True):
    ... # verbose stuff
Parameters:
  • stderr – If True, suppress output from stderr. Defaults to True.
  • stdout – If True, suppress output from stdout. Defaults to False.
flutes.io.progress_open(path, mode='r', *, encoding='utf-8', verbose=True, buffer_size=8192, bar_fn: Optional[Callable[[...], tqdm.std.tqdm]] = None, **kwargs)[source]

A replacement for open() that shows the progress of reading the file:

with progress_open(path, mode="r") as f:
    # `f` is just what you'd get with `open(path)`, now with a progress bar
    bar = f.progress_bar  # type: tqdm.tqdm
Parameters:
  • path – Path to the file.
  • mode – The file open mode. When progress bar is enabled, only read modes "r" and "rb" are supported (write progress doesn’t make a lot of sense). Defaults to "r".
  • encoding – Encoding for the file. Only required for "r" mode. Defaults to "utf-8".
  • verbose – If False, the progress bar is not displayed and a normal file object is returned. Defaults to True.
  • buffer_size – The size of the file buffer. Defaults to io.DEFAULT_BUFFER_SIZE.
  • bar_fn

    An optional callable that constructs a progress bar when called. This is useful when you want to override the default progress bar, for instance, to use with ProgressBarManager:

    def process(path: str, bar: flutes.ProgressBarManager.Proxy):
        with flutes.progress_open(path, bar_fn=bar.new) as f:
            
    
  • kwargs – Additional arguments to pass to tqdm initializer.
Returns:

A file object.

flutes.io.reverse_open(path: PathType, *, encoding: str = 'utf-8', allow_empty_lines: bool = False, buffer_size: int = 8192)[source]

A generator that returns the lines of a file in reverse order. Usage and syntax is the same as built-in method open().

Parameters:
  • path – Path to file.
  • encoding – Encoding of file. Defaults to "utf-8".
  • allow_empty_lines – If False, empty lines are skipped. Defaults to False.
  • buffer_size – Buffer size. You probably won’t need to change this for most cases. Defaults to io.DEFAULT_BUFFER_SIZE.

Iterator

class flutes.iterator.LazyList(iterable: Iterable[T])[source]

A wrapper over an iterable to allow lazily converting it into a list. The iterable is only iterated up to the accessed indices.

Parameters:iterable – The iterable to wrap.
class flutes.iterator.Range(*args)[source]

A replacement for built-in range with support for indexing operators. For example:

>>> r = Range(10)         # (end)
>>> r = Range(1, 10 + 1)  # (start, end)
>>> r = Range(1, 11, 2)   # (start, end, step)
>>> print(r[0], r[2], r[4])
1 5 9
class flutes.iterator.MapList(func: Callable[[T], R], lst: Sequence[T])[source]

A wrapper over a list that allows lazily performing transformations on the list elements. It’s basically the built-in map() function, with support for indexing operators. An example use case:

>>> import bisect

>>> # Find index of the first element in `a` whose square is >= 10.
... a = [1, 2, 3, 4, 5]
... pos = bisect.bisect_left(MapList(lambda x: x * x, a), 10)
3

>>> # Find the first index `i` such that `a[i] * b[i]` is >= 10.
... b = [2, 3, 4, 5, 6]
... pos = bisect.bisect_left(MapList(lambda i: a[i] * b[i], Range(len(a))), 10)
2
Parameters:
  • func – The transformation to perform on list elements.
  • lst – The list to wrap.
flutes.iterator.chunk(n: int, iterable: Iterable[T]) → Iterator[List[T]][source]

Split the iterable into chunks, with each chunk containing no more than n elements.

>>> list(chunk(3, range(10)))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Parameters:
  • n – The maximum number of elements in one chunk.
  • iterable – The iterable.
Returns:

An iterator over chunks.

flutes.iterator.take(n: int, iterable: Iterable[T]) → Iterator[T][source]

Take the first n elements from an iterable.

>>> list(take(5, range(1000000)))
[0, 1, 2, 3, 4]
Parameters:
  • n – The number of elements to take.
  • iterable – The iterable.
Returns:

An iterator returning the first n elements from the iterable.

flutes.iterator.drop(n: int, iterable: Iterable[T]) → Iterator[T][source]

Drop the first n elements from an iterable, and return the rest as an iterator.

>>> next(drop(5, range(1000000)))
5
Parameters:
  • n – The number of elements to drop.
  • iterable – The iterable.
Returns:

An iterator returning the remaining part of the iterable after the first n elements.

flutes.iterator.drop_until(pred_fn: Callable[[T], bool], iterable: Iterable[T]) → Iterator[T][source]

Drop elements from the iterable until an element that satisfies the predicate is encountered. Similar to the built-in filter() function, but only applied to a prefix of the iterable.

>>> list(drop_until(lambda x: x > 5, range(10)))
[6, 7, 8, 9]
Parameters:
  • pred_fn – The predicate that returned elements should satisfy.
  • iterable – The iterable.
Returns:

The iterator after dropping elements.

flutes.iterator.split_by(iterable: Iterable[A], empty_segments: bool = False, *, criterion=None, separator=None) → Iterator[List[A]][source]

Split a list into sub-lists by dropping certain elements. Exactly one of criterion and separator must be specified. For example:

>>> list(split_by(range(10), criterion=lambda x: x % 3 == 0))
[[1, 2], [4, 5], [7, 8]]

>>> list(split_by(" Split by: ", empty_segments=True, separator='.'))
[[], ['S', 'p', 'l', 'i', 't'], ['b', 'y', ':'], []]
Parameters:
  • iterable – The list to split.
  • empty_segments – If True, include an empty list in cases where two adjacent elements satisfy the criterion.
  • criterion – The criterion to decide whether to drop an element.
  • separator – The separator for sub-lists. An element is dropped if it is equal to parameter.
Returns:

List of sub-lists.

flutes.iterator.scanl(func, iterable, *args)[source]

Computes the intermediate results of reduce(). Equivalent to Haskell’s scanl. For example:

>>> list(scanl(operator.add, [1, 2, 3, 4], 0))
[0, 1, 3, 6, 10]
>>> list(scanl(lambda s, x: x + s, ['a', 'b', 'c', 'd']))
['a', 'ba', 'cba', 'dcba']

Learn more at Learn You a Haskell: Higher Order Functions.

Parameters:
  • func – The function to apply. This should be a binary function where the arguments are: the accumulator, and the current element.
  • iterable – The list of elements to iteratively apply the function to.
  • initial – The initial value for the accumulator. If not supplied, the first element in the list is used.
Returns:

The intermediate results at each step.

flutes.iterator.scanr(func, iterable, *args)[source]

Computes the intermediate results of reduce() applied in reverse. Equivalent to Haskell’s scanr. For example:

>>> scanr(operator.add, [1, 2, 3, 4], 0)
[10, 9, 7, 4, 0]
>>> scanr(lambda s, x: x + s, ['a', 'b', 'c', 'd'])
['abcd', 'bcd', 'cd', 'd']

Learn more at Learn You a Haskell: Higher Order Functions.

Parameters:
  • func – The function to apply. This should be a binary function where the arguments are: the accumulator, and the current element.
  • iterable – The list of elements to iteratively apply the function to.
  • initial – The initial value for the accumulator. If not supplied, the first element in the list is used.
Returns:

The intermediate results at each step, starting from the end.

Logging

flutes.log.get_worker_id() → Optional[int][source]

Return the ID of the pool worker process, or None if the current process is not a pool worker.

flutes.log.get_logging_levels() → List[str][source]

Return a list of logging levels that the logging system supports.

flutes.log.set_log_file(path: PathType, fmt: str = '%(asctime)s %(levelname)s: %(message)s') → None[source]

Set the path of the log file.

Parameters:
  • path – Path to the log file.
  • fmt – Logging format.
flutes.log.log(msg: str, level: str = 'info', force_console: bool = False, timestamp: bool = True, include_proc_id: bool = True) → None[source]

Write a line of log with the specified logging level.

Parameters:
  • msg – Message to log.
  • level – Logging level. Available options are success, warning, error, and info. Defaults to info.
  • force_console – If True, will write to console regardless of logging level setting. Defaults to False.
  • timestamp

    If True, will add a timestamp to the console logging output. Defaults to True.

    ..note::
    The logging level colors apply to the timestamp only, so if timestamp is set to False, then it’s not possible to differentiate between different logging levels under console.
  • include_proc_id – If True, will include the process ID for multiprocessing pool workers. Defaults to True.
flutes.log.set_logging_level(level: str, console: bool = True, file: bool = True) → None[source]

Set the global logging level to the specified level.

Parameters:
  • level – Logging level.
  • console – If True, the specified logging level applies to console output.
  • file – If True, the specified logging level applies to file output.
flutes.log.set_console_logging_function(log_fn: Callable[[str], None]) → None[source]

Set the console logging function for current process only.

Math

flutes.math.ceil_div(a: int, b: int) → int[source]

Integer division that rounds up.

Multi-Processing

flutes.multiproc.safe_pool(processes, *args, state_class=None, init_args=(), closing=None, suppress_exceptions=False, **kwargs)[source]

A wrapper over multiprocessing.Pool with additional functionalities:

  • Fallback to sequential execution when processes == 0.
  • Stateful processes: Functions run in the pool will have access to a mutable state class. See PoolState for details.
  • Handles exceptions gracefully.
  • All pool methods support args and kwds, which allows passing arguments to the called function.

Please see PoolType (non-stateful) and StatefulPoolType for supported methods of the pool instance.

Parameters:
  • processes – The number of worker processes to run. A value of 0 means sequential execution in the current process.
  • state_class – The class of the pool state. This allows functions run by the pool to access a mutable process-local state. The state_class must be a subclass of PoolState. Defaults to None.
  • init_args

    Arguments to the initializer of the pool state. The state will be constructed with:

    state = state_class(*init_args)
    
  • closing

    An optional list of objects to close at exit, routines to run at exit. For each element obj:

    • If it is a callable, obj is called with no arguments.
    • If it has an close() method, obj.close() is invoked.
    • Otherwise, an exception is raised before the pool is constructed.
  • suppress_exceptions – If True, exceptions raised within the lifetime of the pool are suppressed. Defaults to False.
Returns:

A context manager that can be used in a with statement.

class flutes.multiproc.PoolState[source]

Base class for multi-processing pool states. Pool states are mutable objects stored on each worker process, it allows keeping track of an process-local internal state that persists through tasks. This extends the capabilities of pool tasks beyond pure functions — side-effects can also be recorded.

To define a pool state, subclass the PoolState class and define the __init__ method, which will be called when each worker process is spawn. Methods of the state class can then be used as pool tasks.

Here’s an comprehensive example that reads a text file and counts the frequencies for each word appearing in the file. We use a map-reduce approach that distributes tasks to each pool worker and then “reduces” (aggregates) the results.

class WordCounter(flutes.PoolState):
    def __init__(self):
        # Initializes the state; will be called when a worker process is spawn.
        self.word_cnt = collections.Counter()

    @flutes.exception_wrapper()  # prevent the worker from crashing, thus losing data
    def count_words(self, sentence):
        self.word_cnt.update(sentence.split())

def count_words_in_file(path):
    with open(path) as f:
        lines = [line for line in f]
    # Construct a process pool while specifying the pool state class we're using.
    with flutes.safe_pool(processes=4, state_class=WordCounter) as pool:
        # Map the tasks as usual.
        for _ in pool.imap_unordered(WordCounter.count_words, sentences, chunksize=1000):
            pass
        word_counter = collections.Counter()
        # Gather the states and perform the reduce step.
        for state in pool.get_states():
            word_counter.update(state.word_cnt)
    return word_counter

See also: safe_pool(), StatefulPoolType.

__return_state__()[source]

When StatefulPoolType.get_states() is invoked, this method is called for each pool worker to return its state. The default implementation returns the PoolState object itself, but it might be beneficial to override this method in cases such as:

  • The PoolState object contains unpickle-able attributes.
  • You need to dynamically compute the state before it’s retrieved.
class flutes.multiproc.PoolType(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)[source]

Multiprocessing stateless worker pool. See StatefulPoolType for a pool with stateful workers.

Note

This class is only a stub for type annotation and documentation purposes only, and should not be used directly. Please refer to safe_pool() for a user-facing API for constructing pool instances.

apply(fn: Callable[[...], T], args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → T[source]

Calls fn with arguments args and keyword arguments kwds, and blocks until the result is ready.

Please refer to Python documentation on multiprocessing.pool.Pool.apply() for details.

apply_async(func: Callable[[...], T], args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}, callback: Optional[Callable[[T], None]] = None, error_callback: Optional[Callable[[BaseException], None]] = None) → mp.pool.ApplyResult[T][source]

Non-blocking version of apply().

Please refer to Python documentation on multiprocessing.pool.Pool.apply_async() for details.

gather(fn: Callable[[T], Iterator[R]], iterable: Iterable[T], chunksize: int = 1, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → Iterator[R][source]

Apply a function that returns a generator to each element in an iterable, and return an iterator over the concatenation of all elements produced by the generators. Order is not guaranteed across generators, but relative order is preserved for elements from the same generator.

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

The underlying implementation uses a managed queue to hold the results. The sequential equivalent is:

itertools.chain.from_iterable(fn(x, *args, **kwds) for x in iterable)
Parameters:
  • fn – The function returning generators.
  • iterable – The iterable.
  • chunksize – The (approximate) size of each chunk. Defaults to 1. A larger chunksize is beneficial for performance.
  • args – Positional arguments to apply to the function.
  • kwds – Keyword arguments to apply to the function.
Returns:

An iterator over the concatenation of all elements produced by the generators.

imap(fn: Callable[[T], R], iterable: Iterable[T], chunksize: int = 1, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → Iterator[R][source]

Lazy version of map().

Please refer to Python documentation on multiprocessing.pool.Pool.imap() for details.

imap_unordered(fn: Callable[[T], R], iterable: Iterable[T], chunksize: int = 1, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → Iterator[R][source]

Similar to imap(), but the ordering of the results are not guaranteed.

Please refer to Python documentation on multiprocessing.pool.Pool.imap_unordered() for details.

map(fn: Callable[[T], R], iterable: Iterable[T], chunksize: Optional[int] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → List[R][source]

A parallel, eager, blocking equivalent of map(), with support for additional arguments. The sequential equivalent is:

list(map(lambda x: fn(x, *args, **kwds), iterable))

Please refer to Python documentation on multiprocessing.pool.Pool.map() for details.

map_async(fn: Callable[[T], R], iterable: Iterable[T], chunksize: Optional[int] = None, callback: Optional[Callable[[T], None]] = None, error_callback: Optional[Callable[[BaseException], None]] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → mp.pool.ApplyResult[List[R]][source]

Non-blocking version of map().

Please refer to Python documentation on multiprocessing.pool.Pool.map_async() for details.

starmap(fn: Callable[[...], R], iterable: Iterable[Iterable[Any]], chunksize: Optional[int] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → List[R][source]

Similar to map(), except that the elements of iterable are expected to be iterables that are unpacked as arguments. The sequential equivalent is:

list(map(lambda xs: fn(*xs, *args, **kwds), iterable))

Please refer to Python documentation on multiprocessing.pool.Pool.starmap() for details.

starmap_async(fn: Callable[[...], R], iterable: Iterable[Iterable[Any]], chunksize: Optional[int] = None, callback: Optional[Callable[[T], None]] = None, error_callback: Optional[Callable[[BaseException], None]] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → mp.pool.ApplyResult[List[R]][source]

Non-blocking version of starmap().

Please refer to Python documentation on multiprocessing.pool.Pool.starmap_async() for details.

class flutes.multiproc.StatefulPoolType(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)[source]

Multiprocessing worker pool with per-worker states.

Compared to stateless workers provided by the Python multiprocessing library, workers in a stateful pool have access to a process-local mutable state. The state is preserved throughout the lifetime of a worker process. All stateless pool methods are supported in a stateful pool. Please refer to PoolType for a list of supported methods.

The pool state class is set at construction (see safe_pool()), and must be a subclass of PoolState. A stateful pool with State as the state class supports using these functions as tasks:

  • An unbound method of State class. The unbound method will be bound to the process-local state upon dispatch.
  • Any other pickle-able function. These functions will not be able to access the pool state. As a precaution, an exception will be thrown if the first argument of the function is self.

Please refer to PoolState for a comprehensive example.

Note

This class is only a stub for type annotation and documentation purposes only, and should not be used directly. Please refer to safe_pool() for a user-facing API for constructing pool instances.

broadcast(fn: Callable[[State], R], *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → List[R][source]

Call the function on each worker process and gather results. It is guaranteed that the function is called on each worker process exactly once.

This function is blocking.

Parameters:
  • fn – The function to call on workers. This must be an unbound method of the pool state class.
  • args – Positional arguments to apply to the function.
  • kwds – Keyword arguments to apply to the function.
Returns:

A list of results, one from each worker process. Ordering of the results is arbitrary.

get_states() → List[State][source]

Return the states of each pool worker. The pool state class can override the PoolState.__return_state__() method to customize the returned value.

The implementation uses the broadcast() mechanism to retrieve states. This function is blocking.

Note

get_states() must be called within the with block, before the pool terminates. Calling get_states() while iterating over results from imap(), imap_unordered(), or gather() is likely to result in deadlock or long wait periods.

Returns:A list of state for each worker process. Ordering of the states is arbitrary.
class flutes.multiproc.MultiprocessingFileWriter(path: PathType, mode: str = 'a')[source]

A multiprocessing file writer that allows multiple processes to write to the same file. Order is not guaranteed.

This is very similar to flutes.log.MultiprocessingFileHandler.

class flutes.multiproc.ProgressBarManager(verbose: bool = True, **kwargs)[source]

A manager for tqdm progress bars that allows maintaining multiple bars from multiple worker processes.

def run(xs: List[int], *, bar) -> int:
    # Create a new progress bar for the current worker.
    bar.new(total=len(xs), desc="Worker {flutes.get_worker_id()}")
    # Compute-intensive stuff!
    result = 0
    for idx, x in enumerate(xs):
        result += x
        time.sleep(random.uniform(0.01, 0.2))
        bar.update(1, postfix={"sum": result})  # update progress
        if (idx + 1) % 100 == 0:
            # Logging works without messing up terminal output.
            flutes.log(f"Processed {idx + 1} samples")
    return result

def run2(xs: List[int], *, bar) -> int:
    # An alternative way to achieve the same functionalities (though slightly slower):
    result = 0
    for idx, x in enumerate(bar.iter(xs)):
        result += x
        time.sleep(random.uniform(0.01, 0.2))
        bar.update(postfix={"sum": result})  # update progress
        if (idx + 1) % 100 == 0:
            # Logging works without messing up terminal output.
            flutes.log(f"Processed {idx + 1} samples")
    return result

manager = flutes.ProgressBarManager()
# Worker processes interact with the manager through proxies.
run_fn = functools.partial(run, bar=manager.proxy)
with flutes.safe_pool(4) as pool:
    for idx, _ in enumerate(pool.imap_unordered(run_fn, data)):
        flutes.log(f"Processed {idx + 1} arrays")
Parameters:
  • verbose – If False, all progress bars are disabled. Defaults to True.
  • kwargs

    Default arguments for the tqdm progress bar initializer.

class Proxy(queue: mp.Queue[Event])[source]

Proxy class for the progress bar manager. Subprocesses should communicate with the progress bar manager through this class.

close() → None[source]

Close the current progress bar.

new(iterable=None, update_frequency=1, **kwargs)[source]

Construct a new progress bar.

Parameters:
  • iterable – The iterable to decorate with a progress bar. If None, then updates must be manually managed with calls to update().
  • update_frequency

    How many iterations per update. This argument only takes effect if iterable is not None:

    • If update_frequency is a float, then the progress bar is updated whenever the iterable progresses over that percentage of elements. For instance, a value of 0.01 results in an update per 1% of progress. Requires a sized iterable (having a valid __len__).
    • If update_frequency is an int, then the progress bar is updated whenever the iterable progresses over that many elements. For instance, a value of 10 results in an update per 10 elements.
  • kwargs

    Additional arguments for the tqdm progress bar initializer. These can override the default arguments set in the constructor of ProgressBarManager.

Returns:

The wrapped iterable, or the proxy class itself.

update(n: int = 0, *, postfix: Optional[Dict[str, Any]] = None) → None[source]

Update progress for the current progress bar.

Parameters:
  • n – Increment to add to the counter.
  • postfix – An optional dictionary containing additional stats displayed at the end of the progress bar. See tqdm.set_postfix for more details.
write(message: str) → None[source]

Write a message to console without disrupting the progress bars.

Parameters:message – The message to write.
proxy

Return the proxy class for the progress bar manager. Subprocesses should communicate with the manager through the proxy class.

Process Management

class flutes.run.CommandResult(command, return_code, captured_output)[source]
captured_output

The terminal output of the command. captured_output will be None unless an exception occurred, or return_output is set to True.

command

The executed command in its original form.

return_code

The return code of the executed command.

flutes.run.run_command(args: Union[str, List[str]], *, env: Optional[Dict[str, str]] = None, cwd: Optional[PathType] = None, timeout: Optional[float] = None, verbose: bool = False, return_output: bool = False, ignore_errors: bool = False, **kwargs) → flutes.run.CommandResult[source]

A wrapper over subprocess.check_output that prevents deadlock caused by the combination of pipes and timeout. Output is redirected into a temporary file and returned only on exceptions or when return code is nonzero.

In case an OSError occurs, the function will retry for a maximum for 5 times with exponential back-off. If error still occurs, we just re-raise it.

Parameters:
  • args – The command to run. Should be either a str or a list of str depending on whether shell is True.
  • env – Environment variables to set before running the command. Defaults to None.
  • cwd – The working directory of the command to run. If None, uses the default (probably user home).
  • timeout – Maximum running time for the command. If running time exceeds the specified limit, subprocess.TimeoutExpired is thrown.
  • verbose – If True, print out the executed command and output.
  • return_output – If True, the captured output is returned. Otherwise, the return code is returned.
  • ignore_errors – If True, exceptions will not be raised. A special return code of -32768 indicates a subprocess.TimeoutExpired error.
Returns:

An instance of CommandResult.

flutes.run.error_wrapper(err: ExcType) → ExcType[source]

Wrap exceptions raised in subprocess to output captured output by default.

Structures

flutes.structure.reverse_map(d: Dict[T, int]) → List[T][source]

Given a dict containing pairs of (item, id), return a list where the id-th element is item.

Note

It is assumed that the ids form a permutation.

>>> words = ['a', 'aardvark', 'abandon', ...]
>>> word_to_id = {word: idx for idx, word in enumerate(words)}
>>> id_to_word = reverse_map(word_to_id)
>>> (words == id_to_word)
True
Parameters:d – The dictionary mapping item to id.
flutes.structure.register_no_map_class(container_type: Type[T]) → None[source]

Register a container type as non-mappable, i.e., instances of the class will be treated as singleton objects in map_structure() and map_structure_zip(), their contents will not be traversed. This would be useful for certain types that subclass built-in container types, such as torch.Size.

Parameters:container_type – The type of the container, e.g. list, dict.
flutes.structure.no_map_instance(instance: T) → T[source]

Register a container instance as non-mappable, i.e., it will be treated as a singleton object in map_structure() and map_structure_zip(), its contents will not be traversed.

Parameters:instance – The container instance.
flutes.structure.map_structure(fn: Callable[[T], R], obj: Collection[T]) → Collection[R][source]

Map a function over all elements in a (possibly nested) collection.

Parameters:
  • fn – The function to call on elements.
  • obj – The collection to map function over.
Returns:

The collection in the same structure, with elements mapped.

flutes.structure.map_structure_zip(fn: Callable[[...], R], objs: Sequence[Collection[T]]) → Collection[R][source]

Map a function over tuples formed by taking one elements from each (possibly nested) collection. Each collection must have identical structures.

Note

Although identical structures are required, it is not enforced by assertions. The structure of the first collection is assumed to be the structure for all collections.

Parameters:
  • fn – The function to call on elements.
  • objs – The list of collections to map function over.
Returns:

A collection with the same structure, with elements mapped.

Timing

flutes.timing.work_in_progress(desc: str = 'Work in progress')[source]

Time the execution time of a code block or function.

>>> @work_in_progress("Loading file")
... def load_file(path):
...     with open(path, "rb") as f:
...         return pickle.load(f)
...
... obj = load_file("/path/to/some/file")
Loading file... done. (3.52s)

>>> with work_in_progress("Saving file"):
...     with open(path, "wb") as f:
...         pickle.dump(obj, f)
Saving file... done. (3.78s)
Parameters:desc – Description of the task performed.