Fast minibatch sampling

This example shows how to create minibatches from a dataset, which is found in a Machine Learning pipeline. A SeqTools object can then easily serve as input to data module or torch.utils.Dataset.

Data samples

For this example we consider a set of (X, y) data samples where X is a real vector observation and y an integer label.

The following script generates sample data and stores it into large chunks of chunk_size items to mock a dataset.

[1]:
import os
import tempfile
import numpy as np

workdir = tempfile.TemporaryDirectory()
os.chdir(workdir.name)

n_samples = 18000
n_classes = 10
sample_shape = (248,)
chunk_size = 5000

# generate reference class centers
means = np.random.randn(n_classes, *sample_shape) * 3

# generate random class labels
targets = np.random.randint(n_classes, size=n_samples)
np.save('targets.npy', targets)

# generate noisy samples
n_chunks = n_samples // chunk_size + (1 if n_samples % chunk_size > 0 else 0)
for i in range(n_chunks):
    n = min((i + 1) * chunk_size, n_samples) - i * chunk_size
    chunk_file = "values_{:02d}.npy".format(i)
    values = means[targets[i * chunk_size:i * chunk_size + n]] \
        + np.random.randn(n, *sample_shape) * 0.1
    np.save(chunk_file, values)

Data loading

Now begins the actual data loading.

[2]:
import os
import seqtools

targets = np.load("targets.npy")

values_files = sorted(f for f in os.listdir() if f.startswith('values_'))
# use mmap if the data cannot fit in memory
values_chunks = [np.load(f) for f in values_files]
values = seqtools.concatenate(values_chunks)

assert len(values) == len(targets)

seqtools.concatenate consolidates the chunks back into a single list of items, but for that particular case we could also use values = seqtools.unbatch(values_chunks) because all chunks (except for the last one) have the same size.

Let’s now assemble the samples with their targets to facilitate manipulation:

[3]:
dataset = seqtools.collate([values, targets])

and split the dataset between training and testing samples

[4]:
train_dataset = dataset[:-10000]
test_dataset = dataset[-10000:]

In this example, training will be done iteratively using small batches of data sampled from the whole dataset.

[5]:
batch_size = 64

def collate_fn(batch):
    inputs = np.stack([x for x, _ in batch])
    targets = np.stack([y for _, y in batch])
    return inputs, targets

batches = seqtools.batch(train_dataset, batch_size, collate_fn=collate_fn)
[6]:
batches[0]
[6]:
(array([[ 4.63504096e+00,  8.07777256e-02,  2.53722750e+00, ...,
          2.24142875e+00, -7.44678017e-02,  3.81318193e+00],
        [ 4.86076591e+00, -3.79859031e-03,  2.45582098e+00, ...,
          2.29143533e+00,  9.49191910e-03,  3.90597124e+00],
        [ 4.66820276e+00, -1.64713789e-01,  2.27522707e+00, ...,
          2.31626799e+00,  8.35224515e-02,  3.75431843e+00],
        ...,
        [-1.21782062e+00, -8.97894436e+00, -4.58978465e-01, ...,
         -1.39860409e+00, -3.02905992e-01,  2.51382657e-01],
        [ 2.31658324e+00, -1.00244311e+01, -4.79085817e+00, ...,
         -2.30735672e+00,  6.27102273e+00,  1.34728561e+00],
        [ 2.78044716e+00, -1.22713540e+00, -1.10584302e+00, ...,
         -6.36907195e+00,  6.48970989e-01, -9.92530189e-01]]),
 array([2, 2, 2, 5, 5, 8, 5, 4, 9, 5, 6, 1, 4, 8, 7, 9, 4, 6, 7, 3, 8, 1,
        1, 5, 9, 0, 9, 8, 2, 3, 9, 3, 6, 5, 0, 9, 3, 0, 6, 2, 0, 3, 4, 9,
        3, 2, 6, 0, 1, 2, 2, 6, 9, 6, 1, 0, 0, 7, 5, 1, 0, 7, 4, 5]))

Training

With the minibatches ready to be used, we create a Gaussian Naive Bayes model and train over the dataset several times:

[7]:
import time
from sklearn.naive_bayes import GaussianNB

model = GaussianNB()
classes = np.arange(n_classes)
[8]:
%%time

for epoch in range(50):
    for inputs, targets in batches:
        model.partial_fit(inputs, targets, classes=classes)
CPU times: user 14.1 s, sys: 0 ns, total: 14.1 s
Wall time: 14.1 s

Since the model is very simple, building the batches actually takes more time than training. While there is not much that can be done to build individual batches faster, prefetching can help by building batches concurrently using multiple cpu cores. SeqTools proposes two prefetching methods:

  • 'thread' has the smallest overhead but only offer true concurrency for specific loads, notably IO bound operations.

  • 'process' offers true parallelism but values computed by the workers must be sent back to the main process which incurs serialization costs. For buffers data such as numpy arrays, this can be aleviated by the use of shared memory (shm_size argument).

[9]:
method = 'process'
prefetched_batches = seqtools.prefetch(
    batches, method=method, nworkers=2, max_buffered=40, shm_size=10000)

model = GaussianNB()
classes = np.arange(n_classes)
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
/tmp/ipykernel_178/1204983517.py in <module>
      1 method = 'process'
      2 prefetched_batches = seqtools.prefetch(
----> 3     batches, method=method, nworkers=2, max_buffered=40, shm_size=10000)
      4
      5 model = GaussianNB()

~/checkouts/readthedocs.org/user_builds/seqtools-doc/envs/stable/lib/python3.7/site-packages/seqtools/evaluation.py in prefetch(seq, nworkers, method, max_buffered, start_hook, shm_size)
    433         backend = ProcessBacked(
    434             seq, num_workers=nworkers, buffer_size=max_buffered, init_fn=start_hook,
--> 435             shm_size=shm_size)
    436         # limit strain on GC to recycle buffer slots by limiting queued items
    437         max_buffered = max_buffered - 2

~/checkouts/readthedocs.org/user_builds/seqtools-doc/envs/stable/lib/python3.7/site-packages/seqtools/evaluation.py in __init__(self, seq, num_workers, buffer_size, init_fn, shm_size)
     48             raise ValueError("at least one buffer slot required by worker")
     49         if shm_size > 0 and sys.version_info < (3, 8):
---> 50             raise NotImplementedError("shm support requires python>=3.8")
     51         if shm_size > 0 and platform.python_implementation() == "PyPy":
     52             raise NotImplementedError("shm support broken on PyPy")

NotImplementedError: shm support requires python>=3.8
[10]:
%%time

for epoch in range(50):
    for inputs, targets in prefetched_batches:
        model.partial_fit(inputs, targets, classes=classes)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<timed exec> in <module>

NameError: name 'prefetched_batches' is not defined

Testing

For completeness, we evaluate the accuracy of the results on the testing data.

[11]:
test_batches = seqtools.batch(test_dataset, batch_size, collate_fn=collate_fn)

predictions = []
targets = []

for X, y in test_batches:
    predictions.extend(model.predict(X))
    targets.extend(y)

accuracy = np.mean(np.array(predictions) == np.array(targets))
print("Accuracy: {:.0f}%".format(accuracy * 100))
Accuracy: 100%