distributed.utils¶
Distributed helpers, most of which work in a non-distributed context as well for API unity.
- olmo_core.distributed.utils.init_distributed(backend='nccl', timeout=datetime.timedelta(seconds=1800), shared_filesytem=True, **kwargs)[source]¶
Initialize the distributed process group with the given backend(s) and check/set the relevant environment variables. This also calls
torch.cuda.set_device()for backends that support CUDA.If the process group is already initialized, this function will skip the initialization but still set the environment variables and CUDA device. This allows callers to pre-initialize torch.distributed (e.g., without device_id to avoid NCCL hangs when multiple process groups exist) before calling this function.
- olmo_core.distributed.utils.validate_env_vars()[source]¶
Validate distributed environment variables. This is called internally by
init_distributed().
- olmo_core.distributed.utils.is_distributed()[source]¶
Check if in a distributed context.
- Return type:
- olmo_core.distributed.utils.barrier(group=None)[source]¶
Wait for all ranks in the group.
- Return type:
- olmo_core.distributed.utils.get_rank(group=None)[source]¶
Get the rank within the process group.
- Return type:
- olmo_core.distributed.utils.get_global_rank(group_rank, group=None)[source]¶
Translate a rank within a group into it’s global rank.
- Return type:
- olmo_core.distributed.utils.get_local_rank()[source]¶
Get the local rank within the current node.
Warning
This relies on the environment variable
LOCAL_RANKbeing set correctly, but will always return 0 if a distributed process group has not been initialized.- Return type:
- Returns:
The rank.
- olmo_core.distributed.utils.get_fs_local_rank(group=None)[source]¶
Get the local rank per filesystem, meaning that, regardless of the number of nodes, if all ranks share the same filesystem then
get_fs_local_rank()will be equivalent toget_rank(), but if nodes do not share the same filesystem thenget_fs_local_rank()will be equivalent toget_local_rank().Warning
This relies on some environment variables to determine the correct rank. If you are using a shared filesystem across nodes, you can simply set the environment variable
OLMO_SHARED_FS=1. Otherwise you can setFS_LOCAL_RANKfor each process.This will always return 0 if a distributed process group has not been initialized.
- Return type:
- Returns:
The rank.
- olmo_core.distributed.utils.get_world_size(group=None)[source]¶
Get the world size of the default distributed process group. :rtype:
intWarning
This will always return 1 if a distributed group has not been initialized.
- olmo_core.distributed.utils.get_local_world_size()[source]¶
Get the local world size within the default distributed process group.
Warning
This relies on the ‘LOCAL_WORLD_SIZE’ env var but will always return 1 if a distributed process group has not been initialized.
- Return type:
- Returns:
The local world size.
- olmo_core.distributed.utils.get_num_nodes()[source]¶
Get the number of nodes in the default distributed process group.
Warning
This relies on either the ‘NUM_NODES’ or ‘LOCAL_WORLD_SIZE’ env var, but will always return 1 if a distributed process group has not been initialized.
- Return type:
- Returns:
The number of nodes.
- olmo_core.distributed.utils.synchronize_value(value, device, src=0, group=None)[source]¶
Synchronize a value across the distributed process group.
- olmo_core.distributed.utils.synchronize_flag(flag, device, group=None)[source]¶
Synchronize a boolean across the distributed process group.
- Return type:
- olmo_core.distributed.utils.all_reduce_value(value, device, op=<RedOpType.SUM: 0>, group=None)[source]¶
All reduce a value across the distributed process group.
- olmo_core.distributed.utils.broadcast_object(obj, src=0, group=None)[source]¶
Broadcast an object using pickle to all ranks in the process group.
- Return type:
TypeVar(T)
- olmo_core.distributed.utils.all_gather(tensor, group=None)[source]¶
All-gather tensors from the whole group into a list.
- olmo_core.distributed.utils.all_gather_object(obj, group=None)[source]¶
All-gather an object using pickle to all ranks in a process group.
- olmo_core.distributed.utils.get_mesh_coordinates(mesh, rank=None)[source]¶
Calculate the coordinates of a global rank on a device mesh.
- olmo_core.distributed.utils.backend_supports_cuda(backend=None)[source]¶
Check if a distributed backend supports CUDA tensors.
- Return type:
- olmo_core.distributed.utils.backend_supports_cpu(backend=None)[source]¶
Check if a distributed backend supports CPU tensors.
- Return type:
- olmo_core.distributed.utils.do_n_at_a_time(f, *, n=None, process_group=None, world_size=None, local_rank=None)[source]¶
Call a function
fin a distributed context from at mostnranks at a time.All ranks will eventually call the given function exactly once, at which point this function will return.
- Parameters:
f (
Callable[[],TypeVar(T)]) – The function to call from each rank.n (
Optional[int], default:None) – The level of concurrency, i.e. how many ranks are allowed to callfat once. This defaults to the number of nodes, in which case one rank from each node will callfat a time.process_group (
Optional[ProcessGroup], default:None) – The process group to use.
- Return type:
TypeVar(T)