API Documentation¶
Exception¶
-
flutes.exception.
register_ipython_excepthook
() → None[source]¶ Register an exception hook that launches an interactive IPython session upon uncaught exceptions.
-
flutes.exception.
log_exception
(e, user_msg: Optional[str] = None, **kwargs)[source]¶ Convenience function to log an exception using the logging interface.
Parameters: - e – The exception instance.
- user_msg – An optional user message to print.
- kwargs – Additional arguments for
log()
.
-
flutes.exception.
exception_wrapper
(handler_fn=None)[source]¶ Function decorator that calls the specified handler function when a exception occurs inside the decorated function. By default,
handler_fn
isNone
, andlog_exception()
will be called to print the exception details.A custom handler function takes the following arguments:
- A positional argument for the exception object. This must be the first argument of the method.
- Arguments with matching names to arguments in the wrapped method. These arguments will be filled with values passed to the wrapped method. These arguments cannot take default values.
- Arguments without matching names. These arguments must take default values.
- An optional variadic keyword argument (
**kwargs
). This will be filled with remaining argument name-value pairs that are not captured by other arguments.
For example:
def handler_fn(e, three, one, args, my_arg=None, **kw): ... @exception_wrapper(handler_fn) def foo(one, two, *args, three=None, **kwargs): ... foo(1, "2", "arg1", "arg2", four=4)
Assuming a
ValueError
is thrown, the argument values forhandler_fn
would be:e: <ValueError> three: None one: 1 args: ("arg1", "arg2") my_arg: None kw: {"two": "2", "kwargs": {"four": 4}}
File System¶
-
flutes.fs.
readable_size
(size: float) → str[source]¶ Represent file size in human-readable format.
>>> readable_size(1024 * 1024) "1.00M"
Parameters: size – File size in bytes.
-
flutes.fs.
remove_prefix
(s: str, prefix: str) → str[source]¶ Remove the specified prefix from a string. If only parts of the prefix match, then only that part is removed.
>>> remove_prefix("https://github.com/huzecong/flutes", "https://") "github.com/huzecong/flutes" >>> remove_prefix("preface", "prefix") "face"
Parameters: - s – The string whose prefix we want to remove.
- prefix – The prefix to remove.
-
flutes.fs.
copy_tree
(src: PathType, dst: PathType, overwrite: bool = False) → None[source]¶ Copy contents of folder
src
to folderdst
. Thedst
folder can exist or whatever (looking at you,shutil.copytree()
).Parameters: - src – The source directory.
- dst – The destination directory. If it doesn’t exist, it will be created.
- overwrite – If
True
, files indst
will be overwritten if a file with the same relative path exists insrc
. IfFalse
, these files are not copied. Defaults toFalse
.
-
flutes.fs.
cache
(path: Optional[PathType], verbose: bool = True, name: Optional[str] = None)[source]¶ A function decorator that caches the output of the function to disk. If the cache file exists, it is loaded from disk and the function will not be executed.
Parameters: - path – Path to the cache file. If
None
, no cache is loaded or saved. - verbose – If
True
, will print to log. - name – Name of the object to load. Only used for logging purposes.
- path – Path to the cache file. If
-
flutes.fs.
scandir
(path: PathType) → Iterator[PathType][source]¶ Lazily iterate over all files and directories under a directory. The returned path is the absolute path of the child file or directory, with the same type as
path
(pathlib.Path
orstr
).Parameters: path – Path to the directory. Returns: An iterator over children paths.
I/O¶
-
flutes.io.
shut_up
(stderr: bool = True, stdout: bool = False)[source]¶ Suppress output (probably generated by external script or badly-written libraries) for
stderr
orstdout
. This method can be used as a decorator, or a context manager:@shut_up(stderr=True) def verbose_func(...): ... with shut_up(stderr=True): ... # verbose stuff
Parameters:
-
flutes.io.
progress_open
(path, mode='r', *, encoding='utf-8', verbose=True, buffer_size=8192, bar_fn: Optional[Callable[[...], tqdm.std.tqdm]] = None, **kwargs)[source]¶ A replacement for
open()
that shows the progress of reading the file:with progress_open(path, mode="r") as f: # `f` is just what you'd get with `open(path)`, now with a progress bar bar = f.progress_bar # type: tqdm.tqdm
Parameters: - path – Path to the file.
- mode – The file open mode. When progress bar is enabled, only read modes
"r"
and"rb"
are supported (write progress doesn’t make a lot of sense). Defaults to"r"
. - encoding – Encoding for the file. Only required for
"r"
mode. Defaults to"utf-8"
. - verbose – If
False
, the progress bar is not displayed and a normal file object is returned. Defaults toTrue
. - buffer_size – The size of the file buffer. Defaults to
io.DEFAULT_BUFFER_SIZE
. - bar_fn –
An optional callable that constructs a progress bar when called. This is useful when you want to override the default progress bar, for instance, to use with
ProgressBarManager
:def process(path: str, bar: flutes.ProgressBarManager.Proxy): with flutes.progress_open(path, bar_fn=bar.new) as f: …
- kwargs – Additional arguments to pass to tqdm initializer.
Returns: A file object.
-
flutes.io.
reverse_open
(path: PathType, *, encoding: str = 'utf-8', allow_empty_lines: bool = False, buffer_size: int = 8192)[source]¶ A generator that returns the lines of a file in reverse order. Usage and syntax is the same as built-in method
open()
.Parameters: - path – Path to file.
- encoding – Encoding of file. Defaults to
"utf-8"
. - allow_empty_lines – If
False
, empty lines are skipped. Defaults toFalse
. - buffer_size – Buffer size. You probably won’t need to change this for most cases. Defaults to
io.DEFAULT_BUFFER_SIZE
.
Iterator¶
-
class
flutes.iterator.
LazyList
(iterable: Iterable[T])[source]¶ A wrapper over an iterable to allow lazily converting it into a list. The iterable is only iterated up to the accessed indices.
Parameters: iterable – The iterable to wrap.
-
class
flutes.iterator.
Range
(*args)[source]¶ A replacement for built-in
range
with support for indexing operators. For example:>>> r = Range(10) # (end) >>> r = Range(1, 10 + 1) # (start, end) >>> r = Range(1, 11, 2) # (start, end, step) >>> print(r[0], r[2], r[4]) 1 5 9
-
class
flutes.iterator.
MapList
(func: Callable[[T], R], lst: Sequence[T])[source]¶ A wrapper over a list that allows lazily performing transformations on the list elements. It’s basically the built-in
map()
function, with support for indexing operators. An example use case:>>> import bisect >>> # Find index of the first element in `a` whose square is >= 10. ... a = [1, 2, 3, 4, 5] ... pos = bisect.bisect_left(MapList(lambda x: x * x, a), 10) 3 >>> # Find the first index `i` such that `a[i] * b[i]` is >= 10. ... b = [2, 3, 4, 5, 6] ... pos = bisect.bisect_left(MapList(lambda i: a[i] * b[i], Range(len(a))), 10) 2
Parameters: - func – The transformation to perform on list elements.
- lst – The list to wrap.
-
flutes.iterator.
chunk
(n: int, iterable: Iterable[T]) → Iterator[List[T]][source]¶ Split the iterable into chunks, with each chunk containing no more than
n
elements.>>> list(chunk(3, range(10))) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Parameters: - n – The maximum number of elements in one chunk.
- iterable – The iterable.
Returns: An iterator over chunks.
-
flutes.iterator.
take
(n: int, iterable: Iterable[T]) → Iterator[T][source]¶ Take the first
n
elements from an iterable.>>> list(take(5, range(1000000))) [0, 1, 2, 3, 4]
Parameters: - n – The number of elements to take.
- iterable – The iterable.
Returns: An iterator returning the first
n
elements from the iterable.
-
flutes.iterator.
drop
(n: int, iterable: Iterable[T]) → Iterator[T][source]¶ Drop the first
n
elements from an iterable, and return the rest as an iterator.>>> next(drop(5, range(1000000))) 5
Parameters: - n – The number of elements to drop.
- iterable – The iterable.
Returns: An iterator returning the remaining part of the iterable after the first
n
elements.
-
flutes.iterator.
drop_until
(pred_fn: Callable[[T], bool], iterable: Iterable[T]) → Iterator[T][source]¶ Drop elements from the iterable until an element that satisfies the predicate is encountered. Similar to the built-in
filter()
function, but only applied to a prefix of the iterable.>>> list(drop_until(lambda x: x > 5, range(10))) [6, 7, 8, 9]
Parameters: - pred_fn – The predicate that returned elements should satisfy.
- iterable – The iterable.
Returns: The iterator after dropping elements.
-
flutes.iterator.
split_by
(iterable: Iterable[A], empty_segments: bool = False, *, criterion=None, separator=None) → Iterator[List[A]][source]¶ Split a list into sub-lists by dropping certain elements. Exactly one of
criterion
andseparator
must be specified. For example:>>> list(split_by(range(10), criterion=lambda x: x % 3 == 0)) [[1, 2], [4, 5], [7, 8]] >>> list(split_by(" Split by: ", empty_segments=True, separator='.')) [[], ['S', 'p', 'l', 'i', 't'], ['b', 'y', ':'], []]
Parameters: - iterable – The list to split.
- empty_segments – If
True
, include an empty list in cases where two adjacent elements satisfy the criterion. - criterion – The criterion to decide whether to drop an element.
- separator – The separator for sub-lists. An element is dropped if it is equal to
parameter
.
Returns: List of sub-lists.
-
flutes.iterator.
scanl
(func, iterable, *args)[source]¶ Computes the intermediate results of
reduce()
. Equivalent to Haskell’sscanl
. For example:>>> list(scanl(operator.add, [1, 2, 3, 4], 0)) [0, 1, 3, 6, 10] >>> list(scanl(lambda s, x: x + s, ['a', 'b', 'c', 'd'])) ['a', 'ba', 'cba', 'dcba']
Learn more at Learn You a Haskell: Higher Order Functions.
Parameters: - func – The function to apply. This should be a binary function where the arguments are: the accumulator, and the current element.
- iterable – The list of elements to iteratively apply the function to.
- initial – The initial value for the accumulator. If not supplied, the first element in the list is used.
Returns: The intermediate results at each step.
-
flutes.iterator.
scanr
(func, iterable, *args)[source]¶ Computes the intermediate results of
reduce()
applied in reverse. Equivalent to Haskell’sscanr
. For example:>>> scanr(operator.add, [1, 2, 3, 4], 0) [10, 9, 7, 4, 0] >>> scanr(lambda s, x: x + s, ['a', 'b', 'c', 'd']) ['abcd', 'bcd', 'cd', 'd']
Learn more at Learn You a Haskell: Higher Order Functions.
Parameters: - func – The function to apply. This should be a binary function where the arguments are: the accumulator, and the current element.
- iterable – The list of elements to iteratively apply the function to.
- initial – The initial value for the accumulator. If not supplied, the first element in the list is used.
Returns: The intermediate results at each step, starting from the end.
Logging¶
-
flutes.log.
get_worker_id
() → Optional[int][source]¶ Return the ID of the pool worker process, or
None
if the current process is not a pool worker.
-
flutes.log.
get_logging_levels
() → List[str][source]¶ Return a list of logging levels that the logging system supports.
-
flutes.log.
set_log_file
(path: PathType, fmt: str = '%(asctime)s %(levelname)s: %(message)s') → None[source]¶ Set the path of the log file.
Parameters: - path – Path to the log file.
- fmt – Logging format.
-
flutes.log.
log
(msg: str, level: str = 'info', force_console: bool = False, timestamp: bool = True, include_proc_id: bool = True) → None[source]¶ Write a line of log with the specified logging level.
Parameters: - msg – Message to log.
- level – Logging level. Available options are
success
,warning
,error
, andinfo
. Defaults toinfo
. - force_console – If
True
, will write to console regardless of logging level setting. Defaults toFalse
. - timestamp –
If
True
, will add a timestamp to the console logging output. Defaults toTrue
.- ..note::
- The logging level colors apply to the timestamp only, so if
timestamp
is set toFalse
, then it’s not possible to differentiate between different logging levels under console.
- include_proc_id – If
True
, will include the process ID for multiprocessing pool workers. Defaults toTrue
.
-
flutes.log.
set_logging_level
(level: str, console: bool = True, file: bool = True) → None[source]¶ Set the global logging level to the specified level.
Parameters: - level – Logging level.
- console – If
True
, the specified logging level applies to console output. - file – If
True
, the specified logging level applies to file output.
Multi-Processing¶
-
flutes.multiproc.
safe_pool
(processes, *args, state_class=None, init_args=(), closing=None, suppress_exceptions=False, **kwargs)[source]¶ A wrapper over
multiprocessing.Pool
with additional functionalities:- Fallback to sequential execution when
processes == 0
. - Stateful processes: Functions run in the pool will have access to a mutable state class. See
PoolState
for details. - Handles exceptions gracefully.
- All pool methods support
args
andkwds
, which allows passing arguments to the called function.
Please see
PoolType
(non-stateful) andStatefulPoolType
for supported methods of the pool instance.Parameters: - processes – The number of worker processes to run. A value of 0 means sequential execution in the current process.
- state_class – The class of the pool state. This allows functions run by the pool to access a mutable
process-local state. The
state_class
must be a subclass ofPoolState
. Defaults toNone
. - init_args –
Arguments to the initializer of the pool state. The state will be constructed with:
state = state_class(*init_args)
- closing –
An optional list of objects to close at exit, routines to run at exit. For each element
obj
:- If it is a callable,
obj
is called with no arguments. - If it has an
close()
method,obj.close()
is invoked. - Otherwise, an exception is raised before the pool is constructed.
- If it is a callable,
- suppress_exceptions – If
True
, exceptions raised within the lifetime of the pool are suppressed. Defaults toFalse
.
Returns: A context manager that can be used in a
with
statement.- Fallback to sequential execution when
-
class
flutes.multiproc.
PoolState
[source]¶ Base class for multi-processing pool states. Pool states are mutable objects stored on each worker process, it allows keeping track of an process-local internal state that persists through tasks. This extends the capabilities of pool tasks beyond pure functions — side-effects can also be recorded.
To define a pool state, subclass the
PoolState
class and define the__init__
method, which will be called when each worker process is spawn. Methods of the state class can then be used as pool tasks.Here’s an comprehensive example that reads a text file and counts the frequencies for each word appearing in the file. We use a map-reduce approach that distributes tasks to each pool worker and then “reduces” (aggregates) the results.
class WordCounter(flutes.PoolState): def __init__(self): # Initializes the state; will be called when a worker process is spawn. self.word_cnt = collections.Counter() @flutes.exception_wrapper() # prevent the worker from crashing, thus losing data def count_words(self, sentence): self.word_cnt.update(sentence.split()) def count_words_in_file(path): with open(path) as f: lines = [line for line in f] # Construct a process pool while specifying the pool state class we're using. with flutes.safe_pool(processes=4, state_class=WordCounter) as pool: # Map the tasks as usual. for _ in pool.imap_unordered(WordCounter.count_words, sentences, chunksize=1000): pass word_counter = collections.Counter() # Gather the states and perform the reduce step. for state in pool.get_states(): word_counter.update(state.word_cnt) return word_counter
See also:
safe_pool()
,StatefulPoolType
.-
__return_state__
()[source]¶ When
StatefulPoolType.get_states()
is invoked, this method is called for each pool worker to return its state. The default implementation returns thePoolState
object itself, but it might be beneficial to override this method in cases such as:- The
PoolState
object contains unpickle-able attributes. - You need to dynamically compute the state before it’s retrieved.
- The
-
-
class
flutes.multiproc.
PoolType
(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)[source]¶ Multiprocessing stateless worker pool. See
StatefulPoolType
for a pool with stateful workers.Note
This class is only a stub for type annotation and documentation purposes only, and should not be used directly. Please refer to
safe_pool()
for a user-facing API for constructing pool instances.-
apply
(fn: Callable[[...], T], args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → T[source]¶ Calls
fn
with argumentsargs
and keyword argumentskwds
, and blocks until the result is ready.Please refer to Python documentation on
multiprocessing.pool.Pool.apply()
for details.
-
apply_async
(func: Callable[[...], T], args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}, callback: Optional[Callable[[T], None]] = None, error_callback: Optional[Callable[[BaseException], None]] = None) → mp.pool.ApplyResult[T][source]¶ Non-blocking version of
apply()
.Please refer to Python documentation on
multiprocessing.pool.Pool.apply_async()
for details.
-
gather
(fn: Callable[[T], Iterator[R]], iterable: Iterable[T], chunksize: int = 1, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → Iterator[R][source]¶ Apply a function that returns a generator to each element in an iterable, and return an iterator over the concatenation of all elements produced by the generators. Order is not guaranteed across generators, but relative order is preserved for elements from the same generator.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting
chunksize
to a positive integer.The underlying implementation uses a managed queue to hold the results. The sequential equivalent is:
itertools.chain.from_iterable(fn(x, *args, **kwds) for x in iterable)
Parameters: - fn – The function returning generators.
- iterable – The iterable.
- chunksize – The (approximate) size of each chunk. Defaults to 1. A larger
chunksize
is beneficial for performance. - args – Positional arguments to apply to the function.
- kwds – Keyword arguments to apply to the function.
Returns: An iterator over the concatenation of all elements produced by the generators.
-
imap
(fn: Callable[[T], R], iterable: Iterable[T], chunksize: int = 1, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → Iterator[R][source]¶ Lazy version of
map()
.Please refer to Python documentation on
multiprocessing.pool.Pool.imap()
for details.
-
imap_unordered
(fn: Callable[[T], R], iterable: Iterable[T], chunksize: int = 1, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → Iterator[R][source]¶ Similar to
imap()
, but the ordering of the results are not guaranteed.Please refer to Python documentation on
multiprocessing.pool.Pool.imap_unordered()
for details.
-
map
(fn: Callable[[T], R], iterable: Iterable[T], chunksize: Optional[int] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → List[R][source]¶ A parallel, eager, blocking equivalent of
map()
, with support for additional arguments. The sequential equivalent is:list(map(lambda x: fn(x, *args, **kwds), iterable))
Please refer to Python documentation on
multiprocessing.pool.Pool.map()
for details.
-
map_async
(fn: Callable[[T], R], iterable: Iterable[T], chunksize: Optional[int] = None, callback: Optional[Callable[[T], None]] = None, error_callback: Optional[Callable[[BaseException], None]] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → mp.pool.ApplyResult[List[R]][source]¶ Non-blocking version of
map()
.Please refer to Python documentation on
multiprocessing.pool.Pool.map_async()
for details.
-
starmap
(fn: Callable[[...], R], iterable: Iterable[Iterable[Any]], chunksize: Optional[int] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → List[R][source]¶ Similar to
map()
, except that the elements ofiterable
are expected to be iterables that are unpacked as arguments. The sequential equivalent is:list(map(lambda xs: fn(*xs, *args, **kwds), iterable))
Please refer to Python documentation on
multiprocessing.pool.Pool.starmap()
for details.
-
starmap_async
(fn: Callable[[...], R], iterable: Iterable[Iterable[Any]], chunksize: Optional[int] = None, callback: Optional[Callable[[T], None]] = None, error_callback: Optional[Callable[[BaseException], None]] = None, *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → mp.pool.ApplyResult[List[R]][source]¶ Non-blocking version of
starmap()
.Please refer to Python documentation on
multiprocessing.pool.Pool.starmap_async()
for details.
-
-
class
flutes.multiproc.
StatefulPoolType
(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)[source]¶ Multiprocessing worker pool with per-worker states.
Compared to stateless workers provided by the Python
multiprocessing
library, workers in a stateful pool have access to a process-local mutable state. The state is preserved throughout the lifetime of a worker process. All stateless pool methods are supported in a stateful pool. Please refer toPoolType
for a list of supported methods.The pool state class is set at construction (see
safe_pool()
), and must be a subclass ofPoolState
. A stateful pool withState
as the state class supports using these functions as tasks:- An unbound method of
State
class. The unbound method will be bound to the process-local state upon dispatch. - Any other pickle-able function. These functions will not be able to access the pool state. As a precaution, an
exception will be thrown if the first argument of the function is
self
.
Please refer to
PoolState
for a comprehensive example.Note
This class is only a stub for type annotation and documentation purposes only, and should not be used directly. Please refer to
safe_pool()
for a user-facing API for constructing pool instances.-
broadcast
(fn: Callable[[State], R], *, args: Iterable[Any] = (), kwds: Mapping[str, Any] = {}) → List[R][source]¶ Call the function on each worker process and gather results. It is guaranteed that the function is called on each worker process exactly once.
This function is blocking.
Parameters: - fn – The function to call on workers. This must be an unbound method of the pool state class.
- args – Positional arguments to apply to the function.
- kwds – Keyword arguments to apply to the function.
Returns: A list of results, one from each worker process. Ordering of the results is arbitrary.
-
get_states
() → List[State][source]¶ Return the states of each pool worker. The pool state class can override the
PoolState.__return_state__()
method to customize the returned value.The implementation uses the
broadcast()
mechanism to retrieve states. This function is blocking.Note
get_states()
must be called within thewith
block, before the pool terminates. Callingget_states()
while iterating over results fromimap()
,imap_unordered()
, orgather()
is likely to result in deadlock or long wait periods.Returns: A list of state for each worker process. Ordering of the states is arbitrary.
- An unbound method of
-
class
flutes.multiproc.
MultiprocessingFileWriter
(path: PathType, mode: str = 'a')[source]¶ A multiprocessing file writer that allows multiple processes to write to the same file. Order is not guaranteed.
This is very similar to
flutes.log.MultiprocessingFileHandler
.
-
class
flutes.multiproc.
ProgressBarManager
(verbose: bool = True, **kwargs)[source]¶ A manager for tqdm progress bars that allows maintaining multiple bars from multiple worker processes.
def run(xs: List[int], *, bar) -> int: # Create a new progress bar for the current worker. bar.new(total=len(xs), desc="Worker {flutes.get_worker_id()}") # Compute-intensive stuff! result = 0 for idx, x in enumerate(xs): result += x time.sleep(random.uniform(0.01, 0.2)) bar.update(1, postfix={"sum": result}) # update progress if (idx + 1) % 100 == 0: # Logging works without messing up terminal output. flutes.log(f"Processed {idx + 1} samples") return result def run2(xs: List[int], *, bar) -> int: # An alternative way to achieve the same functionalities (though slightly slower): result = 0 for idx, x in enumerate(bar.iter(xs)): result += x time.sleep(random.uniform(0.01, 0.2)) bar.update(postfix={"sum": result}) # update progress if (idx + 1) % 100 == 0: # Logging works without messing up terminal output. flutes.log(f"Processed {idx + 1} samples") return result manager = flutes.ProgressBarManager() # Worker processes interact with the manager through proxies. run_fn = functools.partial(run, bar=manager.proxy) with flutes.safe_pool(4) as pool: for idx, _ in enumerate(pool.imap_unordered(run_fn, data)): flutes.log(f"Processed {idx + 1} arrays")
Parameters: - verbose – If
False
, all progress bars are disabled. Defaults toTrue
. - kwargs –
Default arguments for the tqdm progress bar initializer.
-
class
Proxy
(queue: mp.Queue[Event])[source]¶ Proxy class for the progress bar manager. Subprocesses should communicate with the progress bar manager through this class.
-
new
(iterable=None, update_frequency=1, **kwargs)[source]¶ Construct a new progress bar.
Parameters: - iterable – The iterable to decorate with a progress bar. If
None
, then updates must be manually managed with calls toupdate()
. - update_frequency –
How many iterations per update. This argument only takes effect if
iterable
is notNone
:- If
update_frequency
is afloat
, then the progress bar is updated whenever the iterable progresses over that percentage of elements. For instance, a value of0.01
results in an update per 1% of progress. Requires a sized iterable (having a valid__len__
). - If
update_frequency
is anint
, then the progress bar is updated whenever the iterable progresses over that many elements. For instance, a value of10
results in an update per 10 elements.
- If
- kwargs –
Additional arguments for the tqdm progress bar initializer. These can override the default arguments set in the constructor of
ProgressBarManager
.
Returns: The wrapped iterable, or the proxy class itself.
- iterable – The iterable to decorate with a progress bar. If
-
update
(n: int = 0, *, postfix: Optional[Dict[str, Any]] = None) → None[source]¶ Update progress for the current progress bar.
Parameters: - n – Increment to add to the counter.
- postfix – An optional dictionary containing additional stats displayed at the end of the progress bar. See tqdm.set_postfix for more details.
-
-
proxy
¶ Return the proxy class for the progress bar manager. Subprocesses should communicate with the manager through the proxy class.
- verbose – If
Process Management¶
-
class
flutes.run.
CommandResult
(command, return_code, captured_output)[source]¶ -
captured_output
¶ The terminal output of the command.
captured_output
will beNone
unless an exception occurred, orreturn_output
is set toTrue
.
-
command
¶ The executed command in its original form.
-
return_code
¶ The return code of the executed command.
-
-
flutes.run.
run_command
(args: Union[str, List[str]], *, env: Optional[Dict[str, str]] = None, cwd: Optional[PathType] = None, timeout: Optional[float] = None, verbose: bool = False, return_output: bool = False, ignore_errors: bool = False, **kwargs) → flutes.run.CommandResult[source]¶ A wrapper over
subprocess.check_output
that prevents deadlock caused by the combination of pipes and timeout. Output is redirected into a temporary file and returned only on exceptions or when return code is nonzero.In case an OSError occurs, the function will retry for a maximum for 5 times with exponential back-off. If error still occurs, we just re-raise it.
Parameters: - args – The command to run. Should be either a str or a list of str depending on whether
shell
is True. - env – Environment variables to set before running the command. Defaults to None.
- cwd – The working directory of the command to run. If None, uses the default (probably user home).
- timeout – Maximum running time for the command. If running time exceeds the specified limit,
subprocess.TimeoutExpired
is thrown. - verbose – If
True
, print out the executed command and output. - return_output – If
True
, the captured output is returned. Otherwise, the return code is returned. - ignore_errors – If
True
, exceptions will not be raised. A special return code of -32768 indicates asubprocess.TimeoutExpired
error.
Returns: An instance of
CommandResult
.- args – The command to run. Should be either a str or a list of str depending on whether
-
flutes.run.
error_wrapper
(err: ExcType) → ExcType[source]¶ Wrap exceptions raised in
subprocess
to output captured output by default.
Structures¶
-
flutes.structure.
reverse_map
(d: Dict[T, int]) → List[T][source]¶ Given a dict containing pairs of
(item, id)
, return a list where theid
-th element isitem
.Note
It is assumed that the
id
s form a permutation.>>> words = ['a', 'aardvark', 'abandon', ...] >>> word_to_id = {word: idx for idx, word in enumerate(words)} >>> id_to_word = reverse_map(word_to_id) >>> (words == id_to_word) True
Parameters: d – The dictionary mapping item
toid
.
-
flutes.structure.
register_no_map_class
(container_type: Type[T]) → None[source]¶ Register a container type as non-mappable, i.e., instances of the class will be treated as singleton objects in
map_structure()
andmap_structure_zip()
, their contents will not be traversed. This would be useful for certain types that subclass built-in container types, such astorch.Size
.Parameters: container_type – The type of the container, e.g. list
,dict
.
-
flutes.structure.
no_map_instance
(instance: T) → T[source]¶ Register a container instance as non-mappable, i.e., it will be treated as a singleton object in
map_structure()
andmap_structure_zip()
, its contents will not be traversed.Parameters: instance – The container instance.
-
flutes.structure.
map_structure
(fn: Callable[[T], R], obj: Collection[T]) → Collection[R][source]¶ Map a function over all elements in a (possibly nested) collection.
Parameters: - fn – The function to call on elements.
- obj – The collection to map function over.
Returns: The collection in the same structure, with elements mapped.
-
flutes.structure.
map_structure_zip
(fn: Callable[[...], R], objs: Sequence[Collection[T]]) → Collection[R][source]¶ Map a function over tuples formed by taking one elements from each (possibly nested) collection. Each collection must have identical structures.
Note
Although identical structures are required, it is not enforced by assertions. The structure of the first collection is assumed to be the structure for all collections.
Parameters: - fn – The function to call on elements.
- objs – The list of collections to map function over.
Returns: A collection with the same structure, with elements mapped.
Timing¶
-
flutes.timing.
work_in_progress
(desc: str = 'Work in progress')[source]¶ Time the execution time of a code block or function.
>>> @work_in_progress("Loading file") ... def load_file(path): ... with open(path, "rb") as f: ... return pickle.load(f) ... ... obj = load_file("/path/to/some/file") Loading file... done. (3.52s) >>> with work_in_progress("Saving file"): ... with open(path, "wb") as f: ... pickle.dump(obj, f) Saving file... done. (3.78s)
Parameters: desc – Description of the task performed.