Datasets and Data loading

Note

Most of this guide is specific to text-based data, however the Trainer can be used with other modalities as well by creating a custom data loader subclass of DataLoaderBase (see Using a custom data loader below).

Using OLMo-core’s builtin data loading

Data preparation

OLMo-core’s builtin data loading functionality requires you to pre-tokenize your data into 1D numpy arrays of token IDs. These arrays should include all special tokens already – such as “end of sentence” (EOS) tokens – except for padding tokens.

For example:

import numpy as np

documents = ["Hello, World!", "The quick brown fox jumped over the fence"]

# Tokenize documents
token_ids = []
for doc in documents:
    token_ids.extend(tokenizer.encode(doc))

# Write token IDs to disk
data_mmap = np.memmap("data001.npy", mode="w+", dtype=np.uint32, shape=(len(token_ids),))
data_mmap[:] = token_ids
data_mmap.flush()

See also

The dolma project includes an optimized toolkit for pre-processing data into this format.

Train data loading

Once your data is pre-processed as above there are several different strategies available for loading that data for training. The built-in data loading strategies can be broadly categorized into two types: fixed sequence length (FSL) training and variable sequence length (VSL) training.

You can select between FSL and VSL training by choosing the appropriate data loader class to pass to your Trainer. For example, the NumpyFSLDataLoader or ComposableDataLoader can be used for FSL training, while the NumpyVSLDataLoader can be used for VSL training.

The Numpy*DataLoader variants take a dataset that’s a subclass of NumpyDatasetBase, which handles the details of loading and sampling from your pre-tokenized numpy data files, while the ComposableDataLoader takes one or more InstanceSource objects.

The rest of this section will focus on the Numpy*Dataset classes, but see the olmo_core.data.composable module documentation to learn more about the composable data loading API.

Numpy fixed sequence length (FSL) datasets

The following datasets are for fixed sequence length (FSL) training with NumpyFSLDataLoader, where every training instance is exactly the same length (sequence_length), possibly with document fragmentation across instances or padding within instances. They implement different strategies for how to create those training instances from your pre-tokenized numpy data files where the sequence lengths of individual documents may vary widely.

Concatenate and chunk (NumpyFSLDataset or NumpyFSLDatasetMixture):

The simplest strategy is the “concatenate and chunk” approach, which means all tokenized documents are concatenated together and then chunked into training instances of the desired sequence length. To use this method you just need to pass a NumpyFSLDataLoader with a NumpyFSLDataset to your trainer.

While this strategy is simple and efficient, it does have a couple downsides:

  1. Documents often end up fragmented across multiple training instances. That is, the beginning of a document may be in one training instance while the end of the same document is in another training instance (and thus the model will not be allowed to attend to the entire document at once).

  2. Since each training instance may be composed of multiple documents, the model will be attending to tokens across more than one document simultaneously, which could potentially have adverse affects on the model (see Zhao et al. (2024) for example).

Alternatively, you can use NumpyFSLDatasetMixture to create a dataset that is a fine-grained mixture of dataset sources. See the dataset mixing guide for more details.

Concatenate and chunk + intra-document masking (NumpyFSLDataset w/ generate_doc_lengths=True):

Downside #2 from above can be addressed by using intra-document masking, which you can enable by setting generate_doc_lengths=True in your NumpyFSLDataset and using a model that accepts the parameters doc_lens and max_doc_lens to its forward() method. It is expected that if doc_lens and max_doc_lens are provided, the model will apply intra-document masking internally.

See the Transformer model implementation for an example.

Document packing (NumpyPackedFSLDataset):

An alternative to the concatenate and chunk approach that also addresses the issue of document fragmentation is document packing, which uses the Optimized Best-Fit Decreasing (OBFD) bin-packing algorithm to pack documents into instances without fragmentation (except for sequences longer than the dataset’s sequence_length) and with minimal padding.

Long documents can be handled by either truncating (and discarding) excess tokens or falling back to fragmenting across instances. See LongDocStrategy.

By default, OBFD is applied to each source file separately, which typically achieves very good compactness (minimal padding) if the .npy source files are large enough (>1 Gb), and also allows for parallelization of the packing process, which can be somewhat time consuming at the start of training. You can optionally pack instances from multiple consecutive source files together by setting the source_group_size parameter to a value greater than 1.

Document padding (NumpyPaddedFSLDataset):

This strategy creates fixed-length training instances by padding each document to the target sequence length. Documents shorter than sequence_length are padded with padding tokens, while documents longer than sequence_length are fragmented into multiple instances.

This approach ensures that tokens from different documents never appear in the same training instance, avoiding cross-document attention without requiring intra-document masking. However, it can be inefficient if your documents vary widely from sequence_length, as many padding tokens may be needed. In general, using NumpyPackedFSLDataset is preferred over this approach.

Interleaved documents (NumpyInterleavedFSLDataset):

This dataset will form instances by chunking documents and then interleaving these chunks. The purpose of this approach is to force the model to attend to tokens that are far apart in the training instance, which may help with long-range dependencies. Does not support intra-document masking as that would largely defeat the purpose of interleaving.

Numpy variable sequence length (VSL) training

The natural alternative to FSL is variable sequence length (VSL) training. You can use this approach by setting a NumpyVSLDataLoader as your trainer’s data_loader.

There is only one built-in dataset for VSL training: NumpyVSLDataset. This dataset is used to inject a sequence length-based curriculum during training as introduced in Dataset Decomposition: Faster LLM Training with Variable Sequence Length Curriculum.

When using dataset decomposition, every training instance is a unique subset of tokens from a single document. Therefore there’s no need for intra-document masking.

Using NumpyVSLDataset requires you set a min_sequence_length and max_sequence_length which must both be powers of 2 (e.g. 256 and 4096). Each training batch will be composed of instances of the same sequence length such that the total number of tokens in the batch is equal to your global_batch_size. Your model must be able to handle sequences of up to max_sequence_length.

You can configure a VSLCurriculum to control the sampling probability of different sequence lengths over the course of an epoch.

Using a custom data loader

Using a custom data loader with the Trainer just requires implementing your own DataLoaderBase subclass.

In particular you need to take care when implementing the _iter_batches() method to ensure each batch returned only contains the rank’s local portion of the batch (with exactly rank_batch_size tokens). You should also implement state_dict() and load_state_dict() such that your data loader will pick up where it left off in an epoch after load_state_dict() is called.

For a real-world example see the source code of NumpyDataLoaderBase. But for simplicity here’s a toy example that just generates random token IDs.

import random
from typing import Any, Dict, Iterable, List, Optional

import torch

from olmo_core.aliases import PathOrStr
from olmo_core.data import DataCollator, TextDataLoaderBase


class CustomDataLoader(TextDataLoaderBase):
    """
    An example custom data loader that generates random token IDs.
    """

    def __init__(
        self,
        *,
        sequence_length: int,
        vocab_size: int,
        work_dir: PathOrStr,
        global_batch_size: int,
        dp_world_size: int = 1,
        dp_rank: int = 0,
        fs_local_rank: int = 0,
        seed: int = 0,
        total_batches: int = 2048,
    ):
        super().__init__(
            collator=DataCollator(pad_token_id=vocab_size - 1),
            work_dir=work_dir,
            global_batch_size=global_batch_size,
            dp_world_size=dp_world_size,
            dp_rank=dp_rank,
            fs_local_rank=fs_local_rank,
        )
        assert self.rank_batch_size % sequence_length == 0
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.seed = seed
        self._total_batches = total_batches
        self._dataset: Optional[List[torch.Tensor]]

    @property
    def total_batches(self) -> int:
        return self._total_batches

    def state_dict(self) -> Dict[str, Any]:
        return {
            "batches_processed": self.batches_processed,
            "seed": self.seed,
            "epoch": self._epoch,
        }

    def load_state_dict(self, state_dict: Dict[str, Any]):
        self.batches_processed = state_dict["batches_processed"]
        self.seed = state_dict["seed"]
        self._epoch = state_dict["epoch"]

    def reshuffle(self, epoch: Optional[int] = None, **kwargs):
        del kwargs  # unused

        # Set current epoch.
        if epoch is None:
            epoch = 1 if self._epoch is None else self._epoch + 1
        self._epoch = epoch

        # Generate data.
        rng = random.Random(self.seed + self.epoch)
        instances_per_batch = self.global_batch_size // self.sequence_length
        total_instances = instances_per_batch * self.total_batches
        self._dataset = [
            torch.arange(start=start_idx, end=start_idx + self.sequence_length)
            for start_idx in (
                rng.randint(0, self.vocab_size - self.sequence_length - 2)
                for _ in range(total_instances)
            )
        ]

    def get_mock_batch(self) -> Dict[str, Any]:
        num_instances = self.rank_batch_size // self.sequence_length
        input_ids = torch.randint(0, self.vocab_size, (num_instances, self.sequence_length))
        return {"input_ids": input_ids}

    def _iter_batches(self) -> Iterable[Dict[str, Any]]:
        assert self._dataset is not None, "did you forget to call 'reshuffle()'?"

        # Get global batch instance indices. Shape: (total batches, instances per batch)
        instances_per_batch = self.global_batch_size // self.sequence_length
        indices = torch.arange(len(self._dataset)).view(self.total_batches, instances_per_batch)

        # Offset by batches processed so far.
        indices = indices[self.batches_processed :]

        for batch_indices in indices:
            # Slice batch indices up by rank to create data parallel micro-batches.
            local_batch_indices = batch_indices[self.dp_rank :: self.dp_world_size]
            yield self.collator([self._dataset[idx] for idx in local_batch_indices])