data.utils

olmo_core.data.utils.split_batch(batch, num_microbatch_instances)[source]

Split a batch (such as one generated by the DataCollator) into a list of micro-batches.

Return type:

List[Dict[str, Any]]

olmo_core.data.utils.melt_batch(batch, target_sequence_length)[source]

“Melts” a batch by shortening the sequence length and proportionally increasing the number of instances.

Return type:

Dict[str, Any]

olmo_core.data.utils.truncate_batch(batch, target_sequence_length)[source]

Truncate the instances in a batch to target_sequence_length.

Return type:

Dict[str, Any]

olmo_core.data.utils.write_document_indices(data_path, *, dtype, eos_token_id)[source]

Given a local “.npy” data path from the Dolma toolkit, write a metadata file with start/end indices of each document within the array.

Return type:

Path

olmo_core.data.utils.iter_document_indices(data_path, *, local_cache=None, use_array_if_local=None, eos_token_id=None, bos_token_id=None, dtype=None)[source]

Given a “.npy” data path from the Dolma toolkit, get the list of document start/end indices within the array.

Parameters:
  • data_path (Union[Path, PathLike, str]) – Path to a “.npy” Dolma toolkit data file.

  • local_cache (Union[Path, PathLike, str, None], default: None) – Local directory to put downloads into.

  • use_array_if_local (Optional[bool], default: None) – Use the numpy data array to find the document indices if the array is on the local filesystem and eos_token_id and dtype are provided. This can be a lot faster. Otherwise relies on the metadata file.

  • eos_token_id (Optional[int], default: None) – The EOS token ID. Required to use the local data array instead of the metadata file.

  • dtype (default: None) – The data type of the numpy data array. Required to use the local data array instead of the metadata file.

Return type:

Generator[Tuple[int, int], None, None]

olmo_core.data.utils.iter_document_indices_with_max_sequence_length(data_path, max_sequence_length, *, local_cache=None, use_array_if_local=None, eos_token_id=None, bos_token_id=None, dtype=None, long_doc_strategy='truncate')[source]

Like iter_document_indices() but will either truncate or split documents that are longer than max_sequence_length.

Return type:

Generator[Tuple[int, int], None, None]

olmo_core.data.utils.get_document_indices(data_path, local_cache=None)[source]

Like iter_document_indices() but returns a list.

Return type:

List[Tuple[int, int]]

olmo_core.data.utils.load_array_slice(path, start_idx, end_idx, dtype)[source]

Load a slice from a numpy array on disk.

Parameters:
  • path (Union[Path, PathLike, str]) – The path/URL to the array.

  • start_idx (int) – The start index (0-based) of the slice within the array.

  • end_idx (int) – The end index (0-based, exclusive) of the slice within the array.

  • dtype (Union[Type[uint8], Type[uint16], Type[uint32], Type[uint64], Type[bool]]) – The numpy datatype of the array.

Return type:

ndarray

olmo_core.data.utils.load_array_slice_into_tensor(path, start_idx, end_idx, dtype)[source]

Read a chunk from a numpy array, returning the chunk as a torch.Tensor.

Parameters:
  • path (Union[Path, PathLike, str]) – The path/URL to the array.

  • start_idx (int) – The start index (0-based) of the chunk within the array.

  • end_idx (int) – The end index (0-based, exclusive) of the chunk within the array.

  • dtype (Union[Type[uint8], Type[uint16], Type[uint32], Type[uint64], Type[bool]]) – The numpy datatype of the array.

Return type:

Tensor

olmo_core.data.utils.get_document_lengths(input_ids, eos_token_id, bos_token_id=None)[source]

Get the length of documents.

Parameters:
  • input_ids (Union[Tensor, ndarray]) – An integer-type tensor of token IDs.

  • eos_token_id (int) – The ID of the EOS token (use to denote document boundaries).

  • bos_token_id (Optional[int], default: None) – The ID of the BOS token (use to denote document boundaries). When provided, every document must start with a BOS token.

Return type:

Tensor

olmo_core.data.utils.get_cumulative_document_lengths(doc_lens)[source]

Transform a batched tensor of document lengths into a 1D tensor of cumulative document lengths for the whole batch.

Parameters:

doc_lens (Tensor) – The document lengths, such as those returned by get_document_lengths().

Return type:

Tensor

olmo_core.data.utils.memmap_to_write(path, *, shape, dtype)[source]

A context manager for safely writing a numpy memory-mapped array to disk. The memory-mapped ndarray returned by the context manager will be mapped to a temporary file until the context exists successfully.

Return type:

Generator[ndarray, None, None]

olmo_core.data.utils.write_array_to_disk(arr, path)[source]

Write a numpy array to disk in the same simple format that np.memmap uses.

olmo_core.data.utils.bucket_documents(path, target, *, buckets, eos_token_id, dtype, indices_dtype=<class 'numpy.uint32'>)[source]

Bucket documents by sequence lengths in powers of 2. Saving the indices of the bucketed documents to target.

Returns the number of original documents and the number of new bucketed documents.

Return type:

Tuple[int, int]

olmo_core.data.utils.segment_documents_into_instances(path, target, *, max_sequence_length, eos_token_id, dtype, indices_dtype=<class 'numpy.uint32'>, bos_token_id=None, sample=None)[source]

Segment documents into instances of at most sequence_length tokens. Saving the indices of the instances to target.

Sample a subset of the instances if sample is provided as a tuple of (max_instances, seed).

Returns the number of original documents and the number of resulting instances documents.

Return type:

Tuple[int, int]

olmo_core.data.utils.find_end_first_consecutive_true(arr)[source]

Function to find the end position of the first consecutive sequence of True in an array.

Return type:

int

olmo_core.data.utils.find_start_last_consecutive_true(arr)[source]

Function to find the start position of the last consecutive sequence of True in an array.

Return type:

int

olmo_core.data.utils.group_consecutive_values(arr, stepsize=1)[source]

Function to group consecutive values in an array.

Return type:

List[ndarray]

class olmo_core.data.utils.RepetitionTuple(start: int, end: int, period: int, times: int)[source]

Bases: NamedTuple

Tuple to store information about a periodic sequence.

start: int

Alias for field number 0

end: int

Alias for field number 1

period: int

Alias for field number 2

times: int

Alias for field number 3

olmo_core.data.utils.find_periodic_sequences(arr, max_period, min_period=1, mask_value=-1)[source]

Function to find periodic sequences in an array.

This function sweeps through the array and checks for sequences of length [min_period, max_period] that repeat at least 3 times. To do so, it reshape the array into a matrix with period columns and checks if each row is equal to the previous row. Blocks of repeating rows indicates repeating sequences.

Because there’s no guarantee that the sequences start at the beginning of each row, it can only detect sequences that repeat at least 3 times. To account for the fact that sequences may not start at the beginning of each row (or end at the end of each row), we check the end of the previous row and the start of the next row to determine the actual start and end positions of the sequence.

Parameters:
  • arr (np.ndarray) – The array to search for periodic sequences.

  • max_period (int) – The maximum period to check for.

  • min_period (int, optional) – The minimum period to check for. Defaults to 1.

  • mask_value (int, optional) – The value to use to pad the array. Defaults to -1.

Return type:

Generator[RepetitionTuple, None, None]

olmo_core.data.utils.chunked(iterable, n)[source]

Group items in the iterable into chunks of size n, at most. This is equivalent to the function from more-itertools with the same name and strict=False.

Return type:

Iterable[List[TypeVar(T)]]

class olmo_core.data.utils.SegmentTreeNode(weight=0, parent=None, children=None, leaf_id=None)[source]

Bases: object

olmo_core.data.utils.pack_documents_into_instances(*paths, max_sequence_length, eos_token_id, dtype, bos_token_id=None, indices_dtype=<class 'numpy.uint64'>, long_doc_strategy='truncate')[source]

Pack document from source files into instances of at most max_sequence_length using a best-fit-decreasing algorithm described in https://arxiv.org/pdf/2404.10830.

Parameters:
  • paths (Union[Path, PathLike, str]) – Paths/URLs to the source files of token IDs. When multiple sources are given, they’ll be treated as if they’ve been concatenated together into a single source file.

  • max_sequence_length (int) – The maximum sequence length of each instance.

  • eos_token_id (int) – The EOS token ID, used to find document boundaries.

  • bos_token_id (Optional[int], default: None) – The BOS token ID, used to find document boundaries in conjunction with the EOS token ID.

  • dtype (Union[Type[uint8], Type[uint16], Type[uint32], Type[uint64]]) – The numpy datatype of the source file.

  • indices_dtype (Union[Type[uint8], Type[uint16], Type[uint32], Type[uint64]], default: <class 'numpy.uint64'>) – The numpy datatype to use for document indices.

  • long_doc_strategy (LongDocStrategy, default: 'truncate') – Specifies how to handle document that are longer than max_sequence_length. If set to “truncate” then those documents are just truncated to max_sequence_length and the excess tokens are discarded. If set to “fragment” then those documents are split into smaller documents so that no tokens are discarded, but you end up with fragmented documents.

Return type:

Tuple[List[List[int]], ndarray, int]

Returns:

A list of instances, where each instance is a list of document IDs, a 2D array of the corresponding document start and end indices, with shape (num_documents, 2), and the total number of tokens packed into instances.

olmo_core.data.utils.attention_mask_to_cache_leftpad(attention_mask)[source]

Convert a left-padding attention mask into a cache leftpad for Flash-Attention.

The mask is expected to be a boolean or 0/1 tensor of shape (batch, seq_len) where True/1 indicates a valid token and the padding is on the left side of the sequence (i.e. all padding tokens come before all valid tokens).

Returns:

(batch_size,), dtype torch.int32. The index that the KV cache starts.

Return type:

cache_leftpad