data.data_loader

Distributed, deterministic, stateful data loaders used by the Trainer.

class olmo_core.data.data_loader.DataLoaderBase(*, work_dir, global_batch_size, dp_world_size=1, dp_rank=0, fs_local_rank=None)[source]

Bases: ABC

An abstract base class for data loaders used by the Trainer.

Warning

When using a DataLoaderBase directly (outside of the Trainer), you must call reshuffle() before starting a new epoch (i.e. before calling __iter__()) and you must call reset() after each epoch (i.e. after the iterator returned from __iter__() has been exhausted). Failure to do so will result in incorrect data order. For example:

# Prepare for the epoch.
data_loader.reshuffle(epoch=1)

for batch in data_loader:
    # process batch
    pass

# Reset internal bookkeeping.
data_loader.reset()
Parameters:
  • work_dir (Union[Path, PathLike, str]) – The working directory. Should be a local directory shared among local ranks.

  • global_batch_size (int) – The global batch size. The units for this depend on the data loader implementation.

  • dp_world_size (int, default: 1) – The data parallel world size.

  • dp_rank (int, default: 0) – The local data parallel rank.

  • fs_local_rank (Optional[int], default: None) – The filesystem-local rank relative to the working directory.

batches_processed

The total number of batches processed so far in the current epoch.

property epoch: int

Get the current epoch (1-based).

Warning

Accessing this before reshuffle() is called will raise an error.

abstract property total_batches: int | None

The total number of batches that the dataset will produce over the course of the current epoch, if known. Otherwise this should return None.

batches_in_epoch(epoch)[source]

By default this is the same as total_batches(), though some data loaders might generate a different number of batches per epoch.

Return type:

Optional[int]

__len__()[source]

Returns the total number of batches in the current epoch (same as total_batches) if known, otherwise a TypeError is raised.

Return type:

int

__iter__()[source]

Iterate over the local rank batches.

Return type:

Iterator[Dict[str, Any]]

property rank_batch_size: int

The batch size per rank.

abstract state_dict()[source]

Get a state dictionary for checkpointing.

Return type:

Dict[str, Any]

abstract load_state_dict(state_dict)[source]

Load a state dict from state_dict() to restore the data loader’s state.

abstract reshuffle(epoch=None, **kwargs)[source]

Reshuffle for a new epoch. Should be called before starting the epoch, regardless of whether or not you’ve called load_state_dict().

Parameters:

epoch (Optional[int], default: None) – The epoch number.

abstract _iter_batches()[source]

Returns an iterable over all batches in the epoch.

Important

This should account for data parallelism in that only the local rank’s portion of each batch should be generated from this method.

Return type:

Iterable[Dict[str, Any]]

Returns:

All batches in the epoch, where each batch just contains the local rank’s portion of the batch, which should have size exactly rank_batch_size.

reset()[source]

Reset epoch bookkeeping. Should be called at the end of an epoch.

abstract get_mock_batch()[source]

Return a batch with arbitrary data. This can just be random data as it’s only used by the trainer to do a dry-run of the forward and backward pass before training officially starts.

Return type:

Dict[str, Any]

global_num_tokens_in_batch(batch)[source]

For text-based data loaders this should return the total (global) number of tokens in the batch. This is used by the trainer for bookkeeping.

Return type:

Optional[int]

class olmo_core.data.data_loader.TextDataLoaderBase(*, collator, work_dir, global_batch_size, dp_world_size=1, dp_rank=0, fs_local_rank=None)[source]

Bases: DataLoaderBase

An abstract base class for text-based data loaders.

Parameters:
  • collator (DataCollator) – The data collator to use to create batches from instances.

  • work_dir (Union[Path, PathLike, str]) – The working directory. Should be shared among local ranks.

  • global_batch_size (int) – The global batch size in tokens.

  • dp_world_size (int, default: 1) – The data parallel world size.

  • dp_rank (int, default: 0) – The local data parallel rank.

  • fs_local_rank (Optional[int], default: None) – The filesystem-local rank.

collator

The data collator.

tokens_processed: int

The total number of tokens processed so far in the current epoch.

__iter__()[source]

Iterate over the local rank batches.

Return type:

Iterator[Dict[str, Any]]

reset()[source]

Reset epoch bookkeeping. Should be called at the end of an epoch.

global_num_tokens_in_batch(batch)[source]

For text-based data loaders this should return the total (global) number of tokens in the batch. This is used by the trainer for bookkeeping.

Return type:

Optional[int]

class olmo_core.data.data_loader.NumpyDataLoaderBase(dataset, *, collator, global_batch_size, work_dir, seed=0, shuffle=True, num_threads=None, num_workers=0, prefetch_factor=None, target_device_type='cpu', dp_world_size=1, dp_rank=0, fs_local_rank=None, ignore_fingerprint_mismatch=False)[source]

Bases: TextDataLoaderBase

A distributed, deterministic, stateful data loader base class for use with NumpyDatasetBase dataset classes.

Parameters:
  • dataset (NumpyDatasetBase) – The dataset to wrap / load from.

  • collator (DataCollator) – The data collator to use to create batches from instances.

  • global_batch_size (int) – The global batch size in tokens.

  • work_dir (Union[Path, PathLike, str]) – The working directory. Should be shared among local ranks.

  • seed (int, default: 0) – The seed to use for shuffling / sampling data.

  • epoch – The epoch to start from.

  • shuffle (bool, default: True) – Whether or not to shuffle the data instances.

  • num_threads (Optional[int], default: None) – The number of threads to use when loading instances.

  • num_workers (int, default: 0) – The number of workers to use when loading batches.

  • prefetch_factor (Optional[int], default: None) – The number of batches to prefetch from each worker.

  • target_device_type (str, default: 'cpu') – The target device type, i.e. the type of the device where the data will ultimately end up on. Note that this data loader does not move batches any device, it just uses this to optimize certain settings.

  • dp_world_size (int, default: 1) – The data parallel world size.

  • dp_rank (int, default: 0) – The local data parallel rank.

  • fs_local_rank (Optional[int], default: None) – The filesystem-local rank.

classmethod wrap_numpy_dataset(dataset, *, global_batch_size, collator, work_dir=None, seed=0, dp_world_size=1, dp_rank=0, fs_local_rank=None, num_threads=None, num_workers=0, prefetch_factor=None, target_device_type='cpu', shuffle=True, ignore_fingerprint_mismatch=False)[source]

Construct the corresponding NumpyDataLoaderBase instance for the given NumpyDatasetBase.

Parameters:

dataset (NumpyDatasetBase) – The dataset to wrap.

Return type:

NumpyDataLoaderBase

state_dict()[source]

Get a state dictionary for checkpointing.

Return type:

Dict[str, Any]

load_state_dict(state_dict)[source]

Load a state dict from state_dict() to restore the data loader’s state.

reshuffle(epoch=None, in_memory=False, **kwargs)[source]

Reshuffle for a new epoch. Should be called before starting the epoch, regardless of whether or not you’ve called load_state_dict().

Parameters:

epoch (Optional[int], default: None) – The epoch number.

get_mock_batch()[source]

Return a batch with arbitrary data. This can just be random data as it’s only used by the trainer to do a dry-run of the forward and backward pass before training officially starts.

Return type:

Dict[str, Any]

_iter_batches()[source]

Returns an iterable over all batches in the epoch.

Important

This should account for data parallelism in that only the local rank’s portion of each batch should be generated from this method.

Return type:

Iterable[Dict[str, Any]]

Returns:

All batches in the epoch, where each batch just contains the local rank’s portion of the batch, which should have size exactly rank_batch_size.

class olmo_core.data.data_loader.NumpyFSLDataLoader(dataset, *, chunk_size=1, **kwargs)[source]

Bases: NumpyDataLoaderBase

A fixed sequence length DataLoaderBase for use with a NumpyFSLDataset.

property total_size: int

The total number of instances that the dataset will produce over the course of an epoch.

property total_batches: int

The total number of batches that the dataset will produce over the course of the current epoch, if known. Otherwise this should return None.

state_dict()[source]

Get a state dictionary for checkpointing.

Return type:

Dict[str, Any]

load_state_dict(state_dict)[source]

Load a state dict from state_dict() to restore the data loader’s state.

class olmo_core.data.data_loader.NumpyVSLDataLoader(dataset, **kwargs)[source]

Bases: NumpyDataLoaderBase

A variable sequence length DataLoaderBase for use with a NumpyVSLDataset.

This implements a sequence length-based curriculum as introduced in Dataset Decomposition: Faster LLM Training with Variable Sequence Length Curriculum.

property total_batches: int

The total number of batches that the dataset will produce over the course of the current epoch, if known. Otherwise this should return None.

state_dict()[source]

Get a state dictionary for checkpointing.

Return type:

Dict[str, Any]

load_state_dict(state_dict)[source]

Load a state dict from state_dict() to restore the data loader’s state.

class olmo_core.data.data_loader.NumpyDataLoaderConfig(global_batch_size, seed, work_dir=None, num_threads=None, num_workers=0, prefetch_factor=None, target_device_type=None, ignore_fingerprint_mismatch=False, *, type=None)[source]

Bases: DataLoaderConfig[NumpyDataLoaderBase]

A configuration class for building NumpyDataLoaderBase data loaders.

registered_base

alias of DataLoaderConfig

build(dataset, *, collator=None, mesh=None, dp_process_group=None)[source]

Construct the NumpyDataLoaderBase.

Parameters:
  • dataset (NumpyDatasetBase) – The dataset.

  • mesh (Optional[DeviceMesh], default: None) – An optional DeviceMesh that defines the data parallel dimensions. Ideally you should create this mesh using build_world_mesh(). Alternatively you can pass the dp_process_group instead.

  • dp_process_group (Optional[ProcessGroup], default: None) – The data parallel process group.

Return type:

NumpyDataLoaderBase