API Reference#

smap

Return a mapping of f over the sequence(s).

starmap

Map a function over a sequence of argument tuples.

arange

Sequential equivalent of Python built-in range.

gather

Return a view on the sequence reordered by indexes.

take

Alias for seqtools.gather().

concatenate

Return a view on the concatenated sequences.

collate

Return a view on the collated/pasted/stacked sequences.

interleave

Interleave elements from several sequences into one.

cycle

Return repeated view of a sequence.

repeat

Make a sequence by repeating a value.

uniter

Simulate an indexable sequence from an iterable.

switch

Combine the values of two sequences based on condition.

case

Switch between different sequences based on selector value.

batch

Return a view of a sequence in groups of k items.

unbatch

Return a view on the concatenation of batched items.

split

Split a sequence into a succession of subsequences.

add_cache

Add a caching mechanism over a sequence.

prefetch

Wrap a sequence to prefetch values ahead using background workers.

SerializableFunc

Decorate a function to become independent from its source file.

EvaluationError

Raised when evaluating an element fails.

seterr

Set how errors are handled.

Mapping#

seqtools.smap(f, *sequences)[source]#

Return a mapping of f over the sequence(s).

Equivalent to [f(x) for x in sequence] with on-demand evaluation.

If several sequences are passed, they will be zipped together and their items will be passed as distinct arguments to f: [f(*x) for x in zip(*sequences)]

smap

Example

>>> a = [1, 2, 3, 4]
>>> print([v + 2 for v in a])
[3, 4, 5, 6]
>>> m = seqtools.smap(lambda x: x + 2, a)
>>> print([v for v in m])
[3, 4, 5, 6]
>>> def do(y, z):
...     print("computing now")
...     return y + z
...
>>> a, b = [1, 2, 3, 4], [4, 3, 2, 1]
>>> m = seqtools.smap(do, a, b)
>>> print([v for v in m])
computing now
computing now
computing now
computing now
[5, 5, 5, 5]
seqtools.starmap(f, sequence)[source]#

Map a function over a sequence of argument tuples.

A sequential equivalent of itertools.starmap().

Indexing and reshaping#

seqtools.arange(start, stop=None, step=None)[source]#

Sequential equivalent of Python built-in range.

seqtools.gather(sequence, indexes)[source]#

Return a view on the sequence reordered by indexes.

gather

Example

>>> arr = ['d', 'e', 'h', 'l', 'o', 'r', 'w', ' ']
>>> idx = [2, 1, 3, 3, 4, 7, 6, 4, 5, 3, 0]
>>> list(seqtools.gather(arr, idx))
['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
seqtools.take(sequence, indexes)[source]#

Alias for seqtools.gather().

seqtools.concatenate(sequences)[source]#

Return a view on the concatenated sequences.

concatenate

Example

>>> data1 = [0, 1, 2, 3]
>>> data2 = [4, 5]
>>> data3 = [6, 7, 8, 9, 10, 11]
>>> cat = seqtools.concatenate([data1, data2, data3])
>>> [cat[i] for i in range(12)]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
seqtools.collate(sequences)[source]#

Return a view on the collated/pasted/stacked sequences.

The n’th element is a tuple of the n’th elements from each sequence.

collate

Example

>>> arr = collate([[ 1,   2,   3,   4],
...                ['a', 'b', 'c', 'd'],
...                [ 5,   6,   7,   8]])
>>> arr[2]
(3, 'c', 7)
seqtools.interleave(*sequences)[source]#

Interleave elements from several sequences into one.

Sequences don’t need to have the same length, the cycling will operate between whatever sequences are left.

interleaving

Example

>>> arr1 = [ 1,   2,   3,   4,   5]
>>> arr2 = ['a', 'b', 'c']
>>> arr3 = [.1,  .2,  .3,  .4]
>>> list(interleave(arr1, arr2, arr3))
[1, 'a', 0.1, 2, 'b', 0.2, 3, 'c', 0.3, 4, 0.4, 5]
seqtools.cycle(sequence, limit=None)[source]#

Return repeated view of a sequence.

collate
Parameters:
  • sequence (Sequence) – The sequence to be repeated.

  • limit (Optional[int]) – An optional size limit.

Example

>>> data = ['a', 'b', 'c']
>>> loop = seqtools.cycle(data)
>>> loop[3]
'a'
>>> loop[3 * 10 ** 9 + 1]  # unbounded sequence
'b'
>>> loop = seqtools.cycle(data, 7)
>>> list(loop)
['a', 'b', 'c', 'a', 'b', 'c', 'a']
seqtools.repeat(value, times=None)[source]#

Make a sequence by repeating a value.

Parameters:
  • value (Any) – Value to be (virtually) replicated.

  • times (Optional[int]) – Optional size limit.

Example

>>> item = 3
>>> repetition = seqtools.repeat(item, 10)
>>> list(repetition)
[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
seqtools.uniter(iterable, cache_size=0, n_parallel=1, size=None)[source]#

Simulate an indexable sequence from an iterable.

Parameters:
  • iterable – an iterable

  • cache_size (int) – number of cached values

  • n_parallel (int) – number of simultaneously active iterators

  • size (int, optional) – optional value to set len, otherwise len will raise NotImplementedError.

uniter

This works by starting, incrementing and restarting one or several iterators to reach requested items. To avoid wasting steps, a cache is implemented and multiple iterators can run in parallel so that one is always closer than the others to requested items.

Example

>>> iterable = range(5000)
>>> a = seqtools.uniter(iterable, cache_size=10, n_parallel=5, size=5000)
>>> a[10]
10
>>> a[1]
1
seqtools.batch(sequence, k, drop_last=False, pad=None, collate_fn=None)[source]#

Return a view of a sequence in groups of k items.

batch
Parameters:
  • sequence (Sequence) – The input sequence.

  • k (int) – Number of items by block.

  • drop_last (bool) – Wether the last block should be ignored if it contains less than k items. (default False)

  • pad (Optional[any]) – padding item value to use in order to increase the size of the last block to k elements, set to None to prevent padding and return an incomplete block anyways (default None).

  • collate_fn (Callable[[Sequence], Sequence]) – An optional function that takes a sequence of batch items and returns a consolidated batch, for example numpy.array().

Returns:

A sequence of batches.

Return type:

Sequence

Example

>>> data = [i for i in range(25)]
>>> batches = seqtools.batch(data, 4, pad=-1, collate_fn=list)
>>> batches[0]
[0, 1, 2, 3]
>>> batches[-1]  # final batch uses padding
[24, -1, -1, -1]
seqtools.unbatch(sequence, batch_size, last_batch_size=None)[source]#

Return a view on the concatenation of batched items.

Reverses the effect of batch().

Parameters:
  • sequence (Sequence[Sequence]) – A sequence of batches.

  • batch_size (int) – The size of the batches, except for the last one which can be smaller.

  • last_batch_size (int) – The size for the last batch if the batch size does not align to the sequence size (default 0).

Returns:

The concatenation of all batches in sequence.

Return type:

Sequence

seqtools.split(sequence, edges)[source]#

Split a sequence into a succession of subsequences.

split
Parameters:
  • sequence (Sequence) – Input sequence.

  • edges (Sequence[int] or int or Sequence[Tuple[int, int]]) –

    edges specifies how to split the sequence

    • A 1D arra y that contains the indexes where the sequence should be cut, the beginning and the end of the sequence are implicit.

    • An int specifies how many cuts of equal size should be done, in which case edges + 1 must divide the length of the sequence.

    • An sequence of int tuples specifies the limits of subsequences.

Returns:

A sequence of subsequences split accordingly.

Return type:

Sequence

Example

>>> data = ['aa', 'ab', 'ac', 'ad',
...         'ba', 'bb',
...         'ca', 'cb', 'cc', 'cd']
>>> chunks = seqtools.split(data, [4, 6])
>>> list(chunks)
[['aa', 'ab', 'ac', 'ad'], ['ba', 'bb'], ['ca', 'cb', 'cc', 'cd']]
>>> chunks = seqtools.split(data, [(0, 2), (4, 6), (6, 8)])
>>> list(chunks)
[['aa', 'ab'], ['ba', 'bb'], ['ca', 'cb']]
seqtools.switch(condition, x, y)[source]#

Combine the values of two sequences based on condition.

switch
Parameters:
  • condition (Sequence[bool]) – a sequence of booleans

  • x (Sequence) – values when condition is true

  • y (Sequence) – values when condition is false

seqtools.case(selector, *values)[source]#

Switch between different sequences based on selector value.

Parameters:
  • selector (Sequence[int]) – indexes of the selected sequence

  • values (Sequence) – data sequences

Evaluation and buffering#

seqtools.add_cache(arr, cache_size=1, cache=None)[source]#

Add a caching mechanism over a sequence.

A reference of the most recently accessed items will be kept and reused when possible.

Parameters:
  • arr (Sequence) – Sequence to provide a cache for.

  • cache_size (int) – Maximum number of cached values (default 1).

  • cache (Optional[Dict[int, Any]]) – Dictionary-like container to use as cache. Defaults to a standard dict.

Returns:

The sequence wrapped with a cache.

Return type:

(Sequence)

Notes

The default cache is thread safe but won’t help when multiple processes try to use it.

Example

>>> def process(x):
...     print("working")
...     return x * 2
>>>
>>> data = [0, 1, 2, 3, 4, 5, 6]
>>> result = seqtools.smap(process, data)
>>> cached = seqtools.add_cache(result)
>>> result[3]
working
6
>>> result[3]  # smap uses systematic on-demand computations
working
6
>>> cached[3]
working
6
>>> cached[3]  # skips computation
6
seqtools.prefetch(seq, nworkers=0, method='thread', max_buffered=10, start_hook=None, shm_size=0)[source]#

Wrap a sequence to prefetch values ahead using background workers.

Every time an element of this container is accessed, the following ones are queued for evaluation by background workers. This is ideally placed at the end of a transformation pipeline when all items are to be evaluated in succession.

gather
Parameters:
  • seq (Sequence) – The data source.

  • nworkers (int) – Number of workers, negative values or zero indicate the number of cpu cores to spare (default 0).

  • method (str) –

    Type of workers (default ‘thread’):

    • ’thread’ uses threading.Thread which has low overhead but allows only one active worker at a time, ideal for IO-bound operations.

    • ’process’ uses multiprocessing.Process which provides full parallelism but adds communication overhead between workers and the parent process.

  • max_buffered (Optional[int]) – limit on the number of prefetched values at any time (default 10).

  • start_hook (Optional[Callable]) – Optional callback run by workers on start.

  • shm_size (int) – Size of shared memory (in bytes) to accelerate transfer of buffer objects (ex: np.ndarray) when method=’process’. Set this to a large enough value to fit the buffers from max_buffered items. Make sure to delete or copy the returned items otherwise allocated shared memory will be depleted quickly. Requires python >= 3.8.

Returns:

The wrapped sequence.

Return type:

Sequence

class seqtools.SerializableFunc(func)[source]#

Decorate a function to become independent from its source file.

Should one pickle a mapping object involving a decorated function, the unpickled mapping will use the original source code for the function regardless of subsequent modifications to the file on disk.

Warning

This is a hackish solution where only the source file containing the function is saved, regenerated and reloaded. Use with care.

Errors#

Please, consult the tutorial on error management for detailed explanations.

class seqtools.EvaluationError[source]#

Raised when evaluating an element fails.

seqtools.seterr(evaluation=None)[source]#

Set how errors are handled.

Parameters:

evaluation (str) –

how errors from user code triggered by SeqTools are propagated:

  • ’wrap’: raise EvaluationError with original error as its cause.

  • ’passthrough’: let the error propagate through SeqTool code, might facilitate step-by-step debugging.

  • None leave unchanged and return current setting

Returns:

The setting value.

Tools#

seqtools.instrument.debug(sequence, func, max_calls=None, max_rate=None)[source]#

Wrap a sequence to trigger a function on each read.

Parameters:
  • sequence (Sequence) – Source sequence.

  • func (Callable) – A function to call whenever an item is read, must take the index and value of the items.

  • max_calls (Optional[int]) – An optional count limit on how many times func is invoked (default None).

  • max_rate (Optional[int]) – An optional rate limit to avoid spamming func.

Returns:

The wrapped sequence.

Return type:

(Sequence)

Example

>>> sequence = [1, 2, 3, 4, 5]
>>> watchthis = seqtools.instrument.debug(
...     sequence, lambda i, v: print(v), 2)
>>> x = watchthis[0]
1
>>> y = watchthis[2]
3
>>> z = watchthis[3]
seqtools.instrument.monitor_throughput(sequence)[source]#

Wrap sequence to monitor throughput.

The resulting sequence has three additional methods:

  • read_delay() the average time it takes to read an item.

  • throughput() the invert of the above.

  • reset() to reset the accumulated statistics.

Example

>>> def process(x):
...     time.sleep(0.1)
...     return x
>>>
>>> data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> result = seqtools.smap(process, data)
>>> result = seqtools.instrument.monitor_throughput(result)
>>> list(result)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> print("{:.1f}".format(result.read_delay()))
0.1