API Reference

smap

Return a mapping of f over the sequence(s).

starmap

Map a function over a sequence of argument tuples.

arange

Sequential equivalent of Python built-in range.

gather

Return a view on the sequence reordered by indexes.

take

Alias for seqtools.gather().

concatenate

Return a view on the concatenated sequences.

collate

Return a view on the collated/pasted/stacked sequences.

interleave

Interleave elements from several sequences into one.

cycle

Return repeated view of a sequence.

repeat

Make a sequence by repeating a value.

switch

Combine the values of two sequences based on condition.

case

Switch between different sequences based on selector value.

batch

Return a view of a sequence in groups of k items.

unbatch

Return a view on the concatenation of batched items.

split

Split a sequence into a succession of subsequences.

add_cache

Add a caching mechanism over a sequence.

prefetch

Wrap a sequence to prefetch values ahead using background workers.

SerializableFunc

Decorate a function to become independent from its source file.

EvaluationError

Raised when evaluating an element fails.

seterr

Set how errors are handled.

Mapping

seqtools.smap(f, *sequences)[source]

Return a mapping of f over the sequence(s).

Equivalent to [f(x) for x in sequence] with on-demand evaluation.

If several sequences are passed, they will be zipped together and their items will be passed as distinct arguments to f: [f(*x) for x in zip(*sequences)]

smap

Example

>>> a = [1, 2, 3, 4]
>>> print([v + 2 for v in a])
[3, 4, 5, 6]
>>> m = seqtools.smap(lambda x: x + 2, a)
>>> print([v for v in m])
[3, 4, 5, 6]
>>> def do(y, z):
...     print("computing now")
...     return y + z
...
>>> a, b = [1, 2, 3, 4], [4, 3, 2, 1]
>>> m = seqtools.smap(do, a, b)
>>> print([v for v in m])
computing now
computing now
computing now
computing now
[5, 5, 5, 5]
seqtools.starmap(f, sequence)[source]

Map a function over a sequence of argument tuples.

A sequential equivalent of itertools.starmap().

Indexing and reshaping

seqtools.arange(start, stop=None, step=None)[source]

Sequential equivalent of Python built-in range.

seqtools.gather(sequence, indexes)[source]

Return a view on the sequence reordered by indexes.

gather

Example

>>> arr = ['d', 'e', 'h', 'l', 'o', 'r', 'w', ' ']
>>> idx = [2, 1, 3, 3, 4, 7, 6, 4, 5, 3, 0]
>>> list(seqtools.gather(arr, idx))
['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
seqtools.take(sequence, indexes)[source]

Alias for seqtools.gather().

seqtools.concatenate(sequences)[source]

Return a view on the concatenated sequences.

concatenate

Example

>>> data1 = [0, 1, 2, 3]
>>> data2 = [4, 5]
>>> data3 = [6, 7, 8, 9, 10, 11]
>>> cat = seqtools.concatenate([data1, data2, data3])
>>> [cat[i] for i in range(12)]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
seqtools.collate(sequences)[source]

Return a view on the collated/pasted/stacked sequences.

The n’th element is a tuple of the n’th elements from each sequence.

collate

Example

>>> arr = collate([[ 1,   2,   3,   4],
...                ['a', 'b', 'c', 'd'],
...                [ 5,   6,   7,   8]])
>>> arr[2]
(3, 'c', 7)
seqtools.interleave(*sequences)[source]

Interleave elements from several sequences into one.

Sequences don’t need to have the same length, the cycling will operate between whatever sequences are left.

interleaving

Example

>>> arr1 = [ 1,   2,   3,   4,   5]
>>> arr2 = ['a', 'b', 'c']
>>> arr3 = [.1,  .2,  .3,  .4]
>>> list(interleave(arr1, arr2, arr3))
[1, 'a', 0.1, 2, 'b', 0.2, 3, 'c', 0.3, 4, 0.4, 5]
seqtools.cycle(sequence, limit=None)[source]

Return repeated view of a sequence.

Parameters
  • sequence (Sequence) – The sequence to be repeated.

  • limit (Optional[int]) – An optional size limit.

collate

Example

>>> data = ['a', 'b', 'c']
>>> loop = seqtools.cycle(data)
>>> loop[3]
'a'
>>> loop[3 * 10 ** 9 + 1]  # unbounded sequence
'b'
>>> loop = seqtools.cycle(data, 7)
>>> list(loop)
['a', 'b', 'c', 'a', 'b', 'c', 'a']
seqtools.repeat(value, times=None)[source]

Make a sequence by repeating a value.

Parameters
  • value (Any) – Value to be (virtually) replicated.

  • times (Optional[int]) – Optional size limit.

repeat

Example

>>> item = 3
>>> repetition = seqtools.repeat(item, 10)
>>> list(repetition)
[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
seqtools.batch(sequence, k, drop_last=False, pad=None, collate_fn=None)[source]

Return a view of a sequence in groups of k items.

batch
Parameters
  • sequence (Sequence) – The input sequence.

  • k (int) – Number of items by block.

  • drop_last (bool) – Wether the last block should be ignored if it contains less than k items. (default False)

  • pad (Optional[any]) – padding item value to use in order to increase the size of the last block to k elements, set to None to prevent padding and return an incomplete block anyways (default None).

  • collate_fn (Callable[[Sequence], Sequence]) – An optional function that takes a sequence of batch items and returns a consolidated batch, for example numpy.array().

Returns

A sequence of batches.

Return type

Sequence

Example

>>> data = [i for i in range(25)]
>>> batches = seqtools.batch(data, 4, pad=-1, collate_fn=list)
>>> batches[0]
[0, 1, 2, 3]
>>> batches[-1]  # final batch uses padding
[24, -1, -1, -1]
seqtools.unbatch(sequence, batch_size, last_batch_size=None)[source]

Return a view on the concatenation of batched items.

Reverses the effect of batch().

Parameters
  • sequence (Sequence[Sequence]) – A sequence of batches.

  • batch_size (int) – The size of the batches, except for the last one which can be smaller.

  • last_batch_size (int) – The size for the last batch if the batch size does not align to the sequence size (default 0).

Returns

The concatenation of all batches in sequence.

Return type

Sequence

seqtools.split(sequence, edges)[source]

Split a sequence into a succession of subsequences.

Parameters
  • sequence (Sequence) – Input sequence.

  • edges (Sequence[int] or int or Sequence[Tuple[int, int]]) –

    edges specifies how to split the sequence

    • A 1D array that contains the indexes where the sequence should be cut, the beginning and the end of the sequence are implicit.

    • An int specifies how many cuts of equal size should be done, in which case edges + 1 must divide the length of the sequence.

    • An sequence of int tuples specifies the limits of subsequences.

Returns

A sequence of subsequences split accordingly.

Return type

Sequence

Example

>>> data = ['aa', 'ab', 'ac', 'ad',
...         'ba', 'bb',
...         'ca', 'cb', 'cc', 'cd']
>>> chunks = seqtools.split(data, [4, 6])
>>> list(chunks)
[['aa', 'ab', 'ac', 'ad'], ['ba', 'bb'], ['ca', 'cb', 'cc', 'cd']]
>>> chunks = seqtools.split(data, [(0, 2), (4, 6), (6, 8)])
>>> list(chunks)
[['aa', 'ab'], ['ba', 'bb'], ['ca', 'cb']]
seqtools.switch(condition, x, y)[source]

Combine the values of two sequences based on condition.

Parameters
  • condition (Sequence[bool]) – a sequence of booleans

  • x (Sequence) – values when condition is true

  • y (Sequence) – values when condition is false

seqtools.case(selector, *values)[source]

Switch between different sequences based on selector value.

Parameters
  • selector (Sequence[int]) – indexes of the selected sequence

  • values (Sequence) – data sequences

Evaluation and buffering

seqtools.add_cache(arr, cache_size=1, cache=None)[source]

Add a caching mechanism over a sequence.

A reference of the most recently accessed items will be kept and reused when possible.

Parameters
  • arr (Sequence) – Sequence to provide a cache for.

  • cache_size (int) – Maximum number of cached values (default 1).

  • cache (Optional[Dict[int, Any]]) – Dictionary-like container to use as cache. Defaults to a standard dict.

Returns

The sequence wrapped with a cache.

Return type

(Sequence)

Notes

The default cache is thread safe but won’t help when multiple processes try to use it.

Example

>>> def process(x):
...     print("working")
...     return x * 2
>>>
>>> data = [0, 1, 2, 3, 4, 5, 6]
>>> result = seqtools.smap(process, data)
>>> cached = seqtools.add_cache(result)
>>> result[3]
working
6
>>> result[3]  # smap uses systematic on-demand computations
working
6
>>> cached[3]
working
6
>>> cached[3]  # skips computation
6
seqtools.prefetch(seq, nworkers=0, method='thread', max_buffered=10, start_hook=None)[source]

Wrap a sequence to prefetch values ahead using background workers.

Every time an element of this container is accessed, the following ones are queued for evaluation by background workers. This is ideally placed at the end of a transformation pipeline when all items are to be evaluated in succession.

gather
Parameters
  • seq (Sequence) – The data source.

  • nworkers (int) – Number of workers, negative values or zero indicate the number of cpu cores to spare (default 0).

  • method (str) –

    Type of workers (default ‘thread’):

    • ’thread’ uses threading.Thread which has low overhead but allows only one active worker at a time, ideal for IO-bound operations.

    • ’process’ uses multiprocessing.Process which provides full parallelism but adds communication overhead between workers and the parent process.

    • ’sharedmem’ also uses processes but with shared memory between workers and the main process which features zero-copy transfers. This adds several limitations however:

      • References to the returned items must be deleted to allow recycling of the memory slots (for example the items can be read as for loop variables and therefore erase at every iteration).

      • All items must be buffers of identical shape and type (ex: np.ndarray), tuples or dicts of buffers are also supported.

      • A fairly large value for max_buffer is recommended to avoid draining all memory slots before the garbage collector releases them.

  • max_buffered (Optional[int]) – limit on the number of prefetched values at any time (default 10).

  • start_hook (Optional[Callable]) – Optional callback run by workers on start.

Returns

The wrapped sequence.

Return type

Sequence

class seqtools.SerializableFunc(func)[source]

Decorate a function to become independent from its source file.

Should one pickle a mapping object involving a decorated function, the unpickled mapping will use the original source code for the function regardless of subsequent modifications to the file on disk.

Warning

This is a hackish solution where only the source file containing the function is saved, regenerated and reloaded. Use with care.

Errors

Please, consult the tutorial on error management for detailed explanations.

class seqtools.EvaluationError[source]

Raised when evaluating an element fails.

seqtools.seterr(evaluation=None)[source]

Set how errors are handled.

Parameters

evaluation (str) –

how errors from user code triggered by SeqTools are propagated:

  • ’wrap’: raise EvaluationError with original error as its cause.

  • ’passthrough’: let the error propagate through SeqTool code, might facilitate step-by-step debugging.

  • None leave unchanged and return current setting

Returns

The setting value.

Tools

seqtools.instrument.debug(sequence, func, max_calls=None, max_rate=None)[source]

Wrap a sequence to trigger a function on each read.

Parameters
  • sequence (Sequence) – Source sequence.

  • func (Callable) – A function to call whenever an item is read, must take the index and value of the items.

  • max_calls (Optional[int]) – An optional count limit on how many times func is invoked (default None).

  • max_rate (Optional[int]) – An optional rate limit to avoid spamming func.

Returns

The wrapped sequence.

Return type

(Sequence)

Example

>>> sequence = [1, 2, 3, 4, 5]
>>> watchthis = seqtools.instrument.debug(
...     sequence, lambda i, v: print(v), 2)
>>> x = watchthis[0]
1
>>> y = watchthis[2]
3
>>> z = watchthis[3]
seqtools.instrument.monitor_throughput(sequence)[source]

Wrap sequence to monitor throughput.

The resulting sequence has three additional methods:

  • read_delay() the average time it takes to read an item.

  • throughput() the invert of the above.

  • reset() to reset the accumulated statistics.

Example

>>> def process(x):
...     time.sleep(0.1)
...     return x
>>>
>>> data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> result = seqtools.smap(process, data)
>>> result = seqtools.instrument.monitor_throughput(result)
>>> list(result)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> print("{:.1f}".format(result.read_delay()))
0.1