distributed.utils

Distributed helpers, most of which work in a non-distributed context as well for API unity.

olmo_core.distributed.utils.init_distributed(backend='nccl', timeout=datetime.timedelta(seconds=1800), shared_filesytem=True, **kwargs)[source]

Initialize the distributed process group with the given backend(s) and check/set the relevant environment variables. This also calls torch.cuda.set_device() for backends that support CUDA.

If the process group is already initialized, this function will skip the initialization but still set the environment variables and CUDA device. This allows callers to pre-initialize torch.distributed (e.g., without device_id to avoid NCCL hangs when multiple process groups exist) before calling this function.

olmo_core.distributed.utils.validate_env_vars()[source]

Validate distributed environment variables. This is called internally by init_distributed().

olmo_core.distributed.utils.get_node_hostname()[source]

Get the hostname of the node.

Return type:

str

olmo_core.distributed.utils.is_distributed()[source]

Check if in a distributed context.

Return type:

bool

olmo_core.distributed.utils.barrier(group=None)[source]

Wait for all ranks in the group.

Return type:

None

olmo_core.distributed.utils.get_rank(group=None)[source]

Get the rank within the process group.

Return type:

int

olmo_core.distributed.utils.get_global_rank(group_rank, group=None)[source]

Translate a rank within a group into it’s global rank.

Return type:

int

olmo_core.distributed.utils.get_local_rank()[source]

Get the local rank within the current node.

Warning

This relies on the environment variable LOCAL_RANK being set correctly, but will always return 0 if a distributed process group has not been initialized.

Return type:

int

Returns:

The rank.

olmo_core.distributed.utils.get_fs_local_rank(group=None)[source]

Get the local rank per filesystem, meaning that, regardless of the number of nodes, if all ranks share the same filesystem then get_fs_local_rank() will be equivalent to get_rank(), but if nodes do not share the same filesystem then get_fs_local_rank() will be equivalent to get_local_rank().

Warning

This relies on some environment variables to determine the correct rank. If you are using a shared filesystem across nodes, you can simply set the environment variable OLMO_SHARED_FS=1. Otherwise you can set FS_LOCAL_RANK for each process.

This will always return 0 if a distributed process group has not been initialized.

Return type:

int

Returns:

The rank.

olmo_core.distributed.utils.get_world_size(group=None)[source]

Get the world size of the default distributed process group. :rtype: int

Warning

This will always return 1 if a distributed group has not been initialized.

olmo_core.distributed.utils.get_local_world_size()[source]

Get the local world size within the default distributed process group.

Warning

This relies on the ‘LOCAL_WORLD_SIZE’ env var but will always return 1 if a distributed process group has not been initialized.

Return type:

int

Returns:

The local world size.

olmo_core.distributed.utils.get_num_nodes()[source]

Get the number of nodes in the default distributed process group.

Warning

This relies on either the ‘NUM_NODES’ or ‘LOCAL_WORLD_SIZE’ env var, but will always return 1 if a distributed process group has not been initialized.

Return type:

int

Returns:

The number of nodes.

olmo_core.distributed.utils.synchronize_value(value, device, src=0, group=None)[source]

Synchronize a value across the distributed process group.

Return type:

TypeVar(V, bool, int, float, Tensor)

olmo_core.distributed.utils.synchronize_flag(flag, device, group=None)[source]

Synchronize a boolean across the distributed process group.

Return type:

bool

olmo_core.distributed.utils.all_reduce_value(value, device, op=<RedOpType.SUM: 0>, group=None)[source]

All reduce a value across the distributed process group.

Return type:

TypeVar(V, bool, int, float, Tensor)

olmo_core.distributed.utils.broadcast_object(obj, src=0, group=None)[source]

Broadcast an object using pickle to all ranks in the process group.

Return type:

TypeVar(T)

olmo_core.distributed.utils.all_gather(tensor, group=None)[source]

All-gather tensors from the whole group into a list.

Return type:

List[Tensor]

olmo_core.distributed.utils.all_gather_object(obj, group=None)[source]

All-gather an object using pickle to all ranks in a process group.

Return type:

List[TypeVar(T)]

olmo_core.distributed.utils.get_mesh_coordinates(mesh, rank=None)[source]

Calculate the coordinates of a global rank on a device mesh.

Parameters:
  • mesh (DeviceMesh) – The device mesh.

  • rank (Optional[int], default: None) – The global rank. If None, the current global rank is used.

Return type:

Optional[List[int]]

Returns:

The coordinates or None if the rank is not part of the mesh.

olmo_core.distributed.utils.backend_supports_cuda(backend=None)[source]

Check if a distributed backend supports CUDA tensors.

Return type:

bool

olmo_core.distributed.utils.backend_supports_cpu(backend=None)[source]

Check if a distributed backend supports CPU tensors.

Return type:

bool

olmo_core.distributed.utils.do_n_at_a_time(f, *, n=None, process_group=None, world_size=None, local_rank=None)[source]

Call a function f in a distributed context from at most n ranks at a time.

All ranks will eventually call the given function exactly once, at which point this function will return.

Parameters:
  • f (Callable[[], TypeVar(T)]) – The function to call from each rank.

  • n (Optional[int], default: None) – The level of concurrency, i.e. how many ranks are allowed to call f at once. This defaults to the number of nodes, in which case one rank from each node will call f at a time.

  • process_group (Optional[ProcessGroup], default: None) – The process group to use.

Return type:

TypeVar(T)