API Reference¶
Return a mapping of f over the sequence(s). |
|
Map a function over a sequence of argument tuples. |
|
Sequential equivalent of Python built-in |
|
Return a view on the sequence reordered by indexes. |
|
Alias for |
|
Return a view on the concatenated sequences. |
|
Return a view on the collated/pasted/stacked sequences. |
|
Interleave elements from several sequences into one. |
|
Return repeated view of a sequence. |
|
Make a sequence by repeating a value. |
|
Simulate an indexable sequence from an iterable. |
|
Combine the values of two sequences based on condition. |
|
Switch between different sequences based on selector value. |
|
Return a view of a sequence in groups of k items. |
|
Return a view on the concatenation of batched items. |
|
Split a sequence into a succession of subsequences. |
|
Add a caching mechanism over a sequence. |
|
Wrap a sequence to prefetch values ahead using background workers. |
|
Decorate a function to become independent from its source file. |
|
Raised when evaluating an element fails. |
|
Set how errors are handled. |
Mapping¶
- seqtools.smap(f, *sequences)[source]¶
Return a mapping of f over the sequence(s).
Equivalent to
[f(x) for x in sequence]
with on-demand evaluation.If several sequences are passed, they will be zipped together and their items will be passed as distinct arguments to f:
[f(*x) for x in zip(*sequences)]
Example
>>> a = [1, 2, 3, 4] >>> print([v + 2 for v in a]) [3, 4, 5, 6] >>> m = seqtools.smap(lambda x: x + 2, a) >>> print([v for v in m]) [3, 4, 5, 6] >>> def do(y, z): ... print("computing now") ... return y + z ... >>> a, b = [1, 2, 3, 4], [4, 3, 2, 1] >>> m = seqtools.smap(do, a, b) >>> print([v for v in m]) computing now computing now computing now computing now [5, 5, 5, 5]
- seqtools.starmap(f, sequence)[source]¶
Map a function over a sequence of argument tuples.
A sequential equivalent of
itertools.starmap()
.
Indexing and reshaping¶
- seqtools.arange(start, stop=None, step=None)[source]¶
Sequential equivalent of Python built-in
range
.
- seqtools.gather(sequence, indexes)[source]¶
Return a view on the sequence reordered by indexes.
Example
>>> arr = ['d', 'e', 'h', 'l', 'o', 'r', 'w', ' '] >>> idx = [2, 1, 3, 3, 4, 7, 6, 4, 5, 3, 0] >>> list(seqtools.gather(arr, idx)) ['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
- seqtools.take(sequence, indexes)[source]¶
Alias for
seqtools.gather()
.
- seqtools.concatenate(sequences)[source]¶
Return a view on the concatenated sequences.
Example
>>> data1 = [0, 1, 2, 3] >>> data2 = [4, 5] >>> data3 = [6, 7, 8, 9, 10, 11] >>> cat = seqtools.concatenate([data1, data2, data3]) >>> [cat[i] for i in range(12)] [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
- seqtools.collate(sequences)[source]¶
Return a view on the collated/pasted/stacked sequences.
The n’th element is a tuple of the n’th elements from each sequence.
Example
>>> arr = collate([[ 1, 2, 3, 4], ... ['a', 'b', 'c', 'd'], ... [ 5, 6, 7, 8]]) >>> arr[2] (3, 'c', 7)
- seqtools.interleave(*sequences)[source]¶
Interleave elements from several sequences into one.
Sequences don’t need to have the same length, the cycling will operate between whatever sequences are left.
Example
>>> arr1 = [ 1, 2, 3, 4, 5] >>> arr2 = ['a', 'b', 'c'] >>> arr3 = [.1, .2, .3, .4] >>> list(interleave(arr1, arr2, arr3)) [1, 'a', 0.1, 2, 'b', 0.2, 3, 'c', 0.3, 4, 0.4, 5]
- seqtools.cycle(sequence, limit=None)[source]¶
Return repeated view of a sequence.
- Parameters
sequence (Sequence) – The sequence to be repeated.
limit (Optional[int]) – An optional size limit.
Example
>>> data = ['a', 'b', 'c'] >>> loop = seqtools.cycle(data) >>> loop[3] 'a' >>> loop[3 * 10 ** 9 + 1] # unbounded sequence 'b' >>> loop = seqtools.cycle(data, 7) >>> list(loop) ['a', 'b', 'c', 'a', 'b', 'c', 'a']
- seqtools.repeat(value, times=None)[source]¶
Make a sequence by repeating a value.
- Parameters
value (Any) – Value to be (virtually) replicated.
times (Optional[int]) – Optional size limit.
Example
>>> item = 3 >>> repetition = seqtools.repeat(item, 10) >>> list(repetition) [3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
- seqtools.uniter(iterable, cache_size=0, n_parallel=1, size=None)[source]¶
Simulate an indexable sequence from an iterable.
- Parameters
This works by starting, incrementing and restarting one or several iterators to reach requested items. To avoid wasting steps, a cache is implemented and multiple iterators can run in parallel so that one is always closer than the others to requested items.
Example
>>> class LineIter: ... def __init__(self, filename): ... self.filename = filename ... ... def __iter__(self): ... with open(self.filename) as f: ... for line in f: ... yield line ... >>> readme = seqtools.uniter(LineIter("LICENSE.txt"), ... cache_size=10, n_parallel=5) >>> readme[3] '1. Definitions\n' >>> readme[1] '==================================\n'
- seqtools.batch(sequence, k, drop_last=False, pad=None, collate_fn=None)[source]¶
Return a view of a sequence in groups of k items.
- Parameters
sequence (Sequence) – The input sequence.
k (int) – Number of items by block.
drop_last (bool) – Wether the last block should be ignored if it contains less than k items. (default False)
pad (Optional[any]) – padding item value to use in order to increase the size of the last block to k elements, set to None to prevent padding and return an incomplete block anyways (default None).
collate_fn (Callable[[Sequence], Sequence]) – An optional function that takes a sequence of batch items and returns a consolidated batch, for example
numpy.array()
.
- Returns
A sequence of batches.
- Return type
Sequence
Example
>>> data = [i for i in range(25)] >>> batches = seqtools.batch(data, 4, pad=-1, collate_fn=list) >>> batches[0] [0, 1, 2, 3] >>> batches[-1] # final batch uses padding [24, -1, -1, -1]
- seqtools.unbatch(sequence, batch_size, last_batch_size=None)[source]¶
Return a view on the concatenation of batched items.
Reverses the effect of
batch()
.- Parameters
- Returns
The concatenation of all batches in sequence.
- Return type
Sequence
- seqtools.split(sequence, edges)[source]¶
Split a sequence into a succession of subsequences.
- Parameters
sequence (Sequence) – Input sequence.
edges (Sequence[int] or int or Sequence[Tuple[int, int]]) –
edges specifies how to split the sequence
A 1D array that contains the indexes where the sequence should be cut, the beginning and the end of the sequence are implicit.
An int specifies how many cuts of equal size should be done, in which case edges + 1 must divide the length of the sequence.
An sequence of int tuples specifies the limits of subsequences.
- Returns
A sequence of subsequences split accordingly.
- Return type
Sequence
Example
>>> data = ['aa', 'ab', 'ac', 'ad', ... 'ba', 'bb', ... 'ca', 'cb', 'cc', 'cd'] >>> chunks = seqtools.split(data, [4, 6]) >>> list(chunks) [['aa', 'ab', 'ac', 'ad'], ['ba', 'bb'], ['ca', 'cb', 'cc', 'cd']] >>> chunks = seqtools.split(data, [(0, 2), (4, 6), (6, 8)]) >>> list(chunks) [['aa', 'ab'], ['ba', 'bb'], ['ca', 'cb']]
Evaluation and buffering¶
- seqtools.add_cache(arr, cache_size=1, cache=None)[source]¶
Add a caching mechanism over a sequence.
A reference of the most recently accessed items will be kept and reused when possible.
- Parameters
- Returns
The sequence wrapped with a cache.
- Return type
(Sequence)
Notes
The default cache is thread safe but won’t help when multiple processes try to use it.
Example
>>> def process(x): ... print("working") ... return x * 2 >>> >>> data = [0, 1, 2, 3, 4, 5, 6] >>> result = seqtools.smap(process, data) >>> cached = seqtools.add_cache(result) >>> result[3] working 6 >>> result[3] # smap uses systematic on-demand computations working 6 >>> cached[3] working 6 >>> cached[3] # skips computation 6
- seqtools.prefetch(seq, nworkers=0, method='thread', max_buffered=10, start_hook=None, shm_size=0)[source]¶
Wrap a sequence to prefetch values ahead using background workers.
Every time an element of this container is accessed, the following ones are queued for evaluation by background workers. This is ideally placed at the end of a transformation pipeline when all items are to be evaluated in succession.
- Parameters
seq (Sequence) – The data source.
nworkers (int) – Number of workers, negative values or zero indicate the number of cpu cores to spare (default 0).
method (str) –
Type of workers (default ‘thread’):
’thread’ uses
threading.Thread
which has low overhead but allows only one active worker at a time, ideal for IO-bound operations.’process’ uses
multiprocessing.Process
which provides full parallelism but adds communication overhead between workers and the parent process.
max_buffered (Optional[int]) – limit on the number of prefetched values at any time (default 10).
start_hook (Optional[Callable]) – Optional callback run by workers on start.
shm_size (int) – Size of shared memory (in bytes) to accelerate transfer of buffer objects (ex: np.ndarray) when method=’process’. Set this to a large enough value to fit the buffers from max_buffered items. Make sure to delete or copy the returned items otherwise allocated shared memory will be depleted quickly. Requires python >= 3.8.
- Returns
The wrapped sequence.
- Return type
Sequence
- class seqtools.SerializableFunc(func)[source]¶
Decorate a function to become independent from its source file.
Should one pickle a mapping object involving a decorated function, the unpickled mapping will use the original source code for the function regardless of subsequent modifications to the file on disk.
Warning
This is a hackish solution where only the source file containing the function is saved, regenerated and reloaded. Use with care.
Errors¶
Please, consult the tutorial on error management for detailed explanations.
- seqtools.seterr(evaluation=None)[source]¶
Set how errors are handled.
- Parameters
evaluation (str) –
how errors from user code triggered by SeqTools are propagated:
’wrap’: raise
EvaluationError
with original error as its cause.’passthrough’: let the error propagate through SeqTool code, might facilitate step-by-step debugging.
None leave unchanged and return current setting
- Returns
The setting value.
Tools¶
- seqtools.instrument.debug(sequence, func, max_calls=None, max_rate=None)[source]¶
Wrap a sequence to trigger a function on each read.
- Parameters
sequence (Sequence) – Source sequence.
func (Callable) – A function to call whenever an item is read, must take the index and value of the items.
max_calls (Optional[int]) – An optional count limit on how many times func is invoked (default None).
max_rate (Optional[int]) – An optional rate limit to avoid spamming func.
- Returns
The wrapped sequence.
- Return type
(Sequence)
Example
>>> sequence = [1, 2, 3, 4, 5] >>> watchthis = seqtools.instrument.debug( ... sequence, lambda i, v: print(v), 2) >>> x = watchthis[0] 1 >>> y = watchthis[2] 3 >>> z = watchthis[3]
- seqtools.instrument.monitor_throughput(sequence)[source]¶
Wrap sequence to monitor throughput.
The resulting sequence has three additional methods:
read_delay()
the average time it takes to read an item.throughput()
the invert of the above.reset()
to reset the accumulated statistics.
Example
>>> def process(x): ... time.sleep(0.1) ... return x >>> >>> data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> result = seqtools.smap(process, data) >>> result = seqtools.instrument.monitor_throughput(result) >>> list(result) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> print("{:.1f}".format(result.read_delay())) 0.1