Shortcuts

Source code for torch.distributed.deprecated

"""
torch.distributed.deprecated provides an MPI-like interface for exchanging tensor
data across multi-machine networks. It supports a few different backends
and initialization methods.
"""
import torch
import atexit
import warnings
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors


class dist_backend:
    UNDEFINED = -1
    TCP = 0
    MPI = 1
    GLOO = 2
    NCCL = 3


_INITIALIZED_PG = 1
_INITIALIZED_MW = 2
_initialized = 0
_backend = dist_backend.UNDEFINED
_scope = locals()


def _extend_scope(module):
    _scope.update({k: getattr(module, k) for k in dir(module) if not k.startswith('_')})


def is_available():
    return torch._C._has_distributed()


def destroy_process_group():
    r"""Destroy the initialized distributed package
    """
    global _backend
    global _initialized
    torch._C._dist_destroy_process_group()
    _backend = dist_backend.UNDEFINED
    _initialized = 0


def is_initialized():
    r"""Checking if the process group has been initialized
    """
    return _initialized == _INITIALIZED_PG


[docs]def init_process_group(backend, init_method='env://', **kwargs): r"""Initializes the distributed package. Arguments: backend (str): Name of the backend to use. Depending on build-time configuration valid values include: ``tcp``, ``mpi``, ``gloo`` and ``nccl``. init_method (str, optional): URL specifying how to initialize the package. world_size (int, optional): Number of processes participating in the job. rank (int, optional): Rank of the current process. group_name (str, optional): Group name. See description of init methods. To enable ``backend == mpi``, PyTorch needs to built from source on a system that supports MPI. If you want to use Open MPI with CUDA-aware support, please use Open MPI major version 2 and above. .. note:: This method initializes CUDA context. Therefore, if multiple processes run on a single machine but use different GPUs, make sure to use :func:`torch.cuda.set_device` before this method to avoid unnecessarily creating context on the first visible device. """ world_size = kwargs.pop('world_size', -1) group_name = kwargs.pop('group_name', '') rank = kwargs.pop('rank', -1) assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys()) if not is_available(): raise RuntimeError("PyTorch built without distributed support") global _initialized if _initialized: raise RuntimeError("trying to initialize torch.distributed.deprecated twice!") # Checking and assigning the distributed backend global _backend backend = backend.lower() if backend == "tcp": _backend = dist_backend.TCP elif backend == "mpi": _backend = dist_backend.MPI elif backend == "gloo": _backend = dist_backend.GLOO elif backend == "nccl": _backend = dist_backend.NCCL else: raise RuntimeError("Invalid distributed backend name: " + backend) torch._C._dist_init_process_group(backend, init_method, world_size, group_name, rank) _initialized = _INITIALIZED_PG if _backend == dist_backend.NCCL: atexit.register(destroy_process_group) if not torch._C._dist_init_extension(False, reduce_op, group): raise RuntimeError("distributed module initialization failed")
def init_master_worker(backend, init_method='env://', **kwargs): warnings.warn(""" ================================================================================ WARNING ================================================================================ Master-worker mode is still experimental. The API will change without notice and we do not guarantee full correctness and expected performance yet. We'll announce it once it's ready. """) world_size = kwargs.pop('world_size', -1) group_name = kwargs.pop('group_name', '') rank = kwargs.pop('rank', -1) assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys()) if not is_available(): raise RuntimeError("PyTorch built without distributed support") global _initialized if _initialized: raise RuntimeError("trying to initialize torch.distributed.deprecated twice!") torch._C._dist_init_master_worker(backend, init_method, world_size, group_name, rank) _initialized = _INITIALIZED_MW import torch.distributed.deprecated.collectives as collectives import torch.distributed.deprecated.remote_types as remote_types _extend_scope(collectives) _extend_scope(remote_types) if not torch._C._dist_init_extension(True, reduce_op, group): raise RuntimeError("distributed module initialization failed") class reduce_op(object): SUM = object() PRODUCT = object() MAX = object() MIN = object() class group(object): WORLD = object() class _DistributedRequest(object): def __init__(self, request): self.request = request def is_completed(self): return torch._C._dist_request_is_completed(self.request) def wait(self): torch._C._dist_request_wait(self.request)
[docs]def get_rank(): r"""Returns the rank of current process. Rank is a unique identifier assigned to each process within a distributed group. They are always consecutive integers ranging from ``0`` to ``world_size - 1`` (inclusive). """ assert torch.distributed.deprecated._initialized return torch._C._dist_get_rank()
[docs]def get_world_size(): r"""Returns the number of processes in the distributed group.""" assert torch.distributed.deprecated._initialized return torch._C._dist_get_num_processes()
[docs]def isend(tensor, dst): r"""Sends a tensor asynchronously. Arguments: tensor (Tensor): Tensor to send. dst (int): Destination rank. Returns: A distributed request object. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return _DistributedRequest(torch._C._dist_isend(tensor, dst))
[docs]def irecv(tensor, src): r"""Receives a tensor asynchronously. Arguments: tensor (Tensor): Tensor to fill with received data. src (int): Source rank. Returns: A distributed request object. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return _DistributedRequest(torch._C._dist_irecv(tensor, src))
[docs]def send(tensor, dst): r"""Sends a tensor synchronously. Arguments: tensor (Tensor): Tensor to send. dst (int): Destination rank. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_send(tensor, dst)
[docs]def recv(tensor, src=None): r"""Receives a tensor synchronously. Arguments: tensor (Tensor): Tensor to fill with received data. src (int, optional): Source rank. Will receive from any process if unspecified. Returns: Sender rank. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" if src is None: return torch._C._dist_recv_any_source(tensor) return torch._C._dist_recv(tensor, src)
[docs]def broadcast_multigpu(tensor_list, src, group=group.WORLD): r"""Broadcasts the tensor to the whole group with multiple GPU tensors per node. :attr:`tensor` must have the same number of elements in all the GPUs from all processes participating in the collective. each tensor in the list must be on a different GPU. .. note:: Only NCCL backend is currently supported. :attr:`tensor_list` should only contain GPU tensors. Arguments: tensor_list (List[Tensor]): Tensors that participate in the collective operation. if ``src`` is the rank, then the first element of ``tensor_list`` (``tensor_list[0]``) will be broadcasted to all other tensors (on different GPUs) in the src process and all tensors in ``tensor_list`` of other non-src processes. You also need to make sure that ``len(tensor_list)`` is the same for all the distributed processes calling this function. src (int): Source rank. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_broadcast_multigpu(tensor_list, src, group)
[docs]def broadcast(tensor, src, group=group.WORLD): r"""Broadcasts the tensor to the whole group. :attr:`tensor` must have the same number of elements in all processes participating in the collective. Arguments: tensor (Tensor): Data to be sent if :attr:`src` is the rank of current process, and tensor to be used to save received data otherwise. src (int): Source rank. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_broadcast(tensor, src, group)
[docs]def all_reduce_multigpu(tensor_list, op=reduce_op.SUM, group=group.WORLD): r"""Reduces the tensor data across all machines in such a way that all get the final result. This function reduces a number of tensors on every node, while each tensor resides on a different GPU. Therefore, the input tensor in the tensor list needs to be GPU tensors. Also, each tensor in the tensor list needs to reside on a different GPU. After the call, all tensors in :attr:`tensor_list` will be bitwise identical in all processes. .. note:: Only NCCL backend is currently supported. :attr:`tensor_list` should only contain GPU tensors. Arguments: tensor_list (List[Tensor]): List of input and output tensors of the collective. The function operates in-place and requires that each tensor to be a GPU tensor on different GPUs. You also need to make sure that ``len(tensor_list)`` is the same for all the distributed processes calling this function. op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` enum. Specifies an operation used for element-wise reductions. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_all_reduce_multigpu(tensor_list, op, group)
[docs]def all_reduce(tensor, op=reduce_op.SUM, group=group.WORLD): r"""Reduces the tensor data across all machines in such a way that all get the final result. After the call :attr:`tensor` will be bitwise identical in all processes. Arguments: tensor (Tensor): Input and output of the collective. The function operates in-place. op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` enum. Specifies an operation used for element-wise reductions. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_all_reduce(tensor, op, group)
[docs]def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD): r"""Reduces the tensor data on multiple GPUs across all machines. Each tensor in :attr`tensor_list` should reside on a separate GPU. Only the GPU of ``tensor_list[0]`` on the process with rank :attr:`dst` is going to receive the final result. .. note:: Only NCCL backend is currently supported. :attr:`tensor_list` should only contain GPU tensors. Arguments: tensor_list (List[Tensor]): Input and output GPU tensors of the collective. The function operates in-place. You also need to make sure that ``len(tensor_list)`` is the same for all the distributed processes calling this function. dst (int): Destination rank op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` enum. Specifies an operation used for element-wise reductions. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group)
[docs]def reduce(tensor, dst, op=reduce_op.SUM, group=group.WORLD): r"""Reduces the tensor data across all machines. Only the process with rank :attr:`dst` is going to receive the final result. Arguments: tensor (Tensor): Input and output of the collective. The function operates in-place. dst (int): Destination rank op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` enum. Specifies an operation used for element-wise reductions. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_reduce(tensor, dst, op, group)
[docs]def all_gather_multigpu(output_tensor_lists, input_tensor_list, group=group.WORLD): r"""Gathers tensors from the whole group in a list. Each tensor in :attr:`input_tensor_list` should reside on a separate GPU. .. note:: Only NCCL backend is currently supported. :attr:`output_tensor_lists` and :attr:`input_tensor_list` should only contain GPU tensors. Arguments: output_tensor_lists (List[List[Tensor]]): Output lists. It should contain correctly-sized tensors on each GPU to be used for output of the collective. e.g. ``output_tensor_lists[i]`` contains the all_gather result that resides on the GPU of ``input_tensor_list[i]``. Note that each element of ``output_tensor_lists[i]`` has the size of ``world_size * len(input_tensor_list)``, since the function all gathers the result from every single GPU in the group. To interpret each element of ``output_tensor_list[i]``, note that ``input_tensor_list[j]`` of rank k will be appear in ``output_tensor_list[i][rank * world_size + j]`` Also note that ``len(output_tensor_lists)``, and the size of each element in ``output_tensor_lists`` (each element is a list, therefore ``len(output_tensor_lists[i])``) need to be the same for all the distributed processes calling this function. input_tensor_list (List[Tensor]): List of tensors (on different GPUs) to be broadcast from current process. Note that ``len(input_tensor_list)`` needs to be the same for all the distributed processes calling this function. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" flatten_tensor_list = [] for output_tensor_list in output_tensor_lists: flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list)) ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list, input_tensor_list, group) for output_tensor_list, flatten_tensor in zip(output_tensor_lists, flatten_tensor_list): for tensor, value in zip(output_tensor_list, _unflatten_dense_tensors(flatten_tensor, output_tensor_list)): tensor.copy_(value) return ret
[docs]def all_gather(tensor_list, tensor, group=group.WORLD): r"""Gathers tensors from the whole group in a list. Arguments: tensor_list (list[Tensor]): Output list. It should contain correctly-sized tensors to be used for output of the collective. tensor (Tensor): Tensor to be broadcast from current process. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" if _backend != dist_backend.NCCL: return torch._C._dist_all_gather(tensor_list, tensor, group) else: return all_gather_multigpu([tensor_list], [tensor], group)
[docs]def gather(tensor, **kwargs): r"""Gathers a list of tensors in a single process. Arguments: tensor (Tensor): Input tensor. dst (int): Destination rank. Required in all processes except the one that is receiveing the data. gather_list (list[Tensor]): List of appropriately-sized tensors to use for received data. Required only in the receiving process. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" my_rank = get_rank() dst = kwargs.pop('dst', my_rank) gather_list = kwargs.pop('gather_list', None) _group = kwargs.pop('group', group.WORLD) if kwargs: raise RuntimeError("got unexpected kwargs") if dst == my_rank: if gather_list is None: raise RuntimeError("gather_list is a required argument in gather destination") return torch._C._dist_gather_recv(gather_list, tensor, _group) else: if gather_list: raise RuntimeError("non-empty gather_list can be given only to gather destination") return torch._C._dist_gather_send(tensor, dst, _group)
[docs]def scatter(tensor, **kwargs): r"""Scatters a list of tensors to all processes in a group. Each process will receive exactly one tensor and store its data in the :attr:`tensor` argument. Arguments: tensor (Tensor): Output tensor. src (int): Source rank. Required in all processes except the one that is sending the data. scatter_list (list[Tensor]): List of tensors to scatter. Required only in the process that is sending the data. group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" my_rank = get_rank() src = kwargs.pop('src', my_rank) scatter_list = kwargs.pop('scatter_list', None) _group = kwargs.pop('group', group.WORLD) if kwargs: raise RuntimeError("got unexpected kwargs: {}".format(", ".join(kwargs.keys()))) if src == my_rank: if scatter_list is None: raise RuntimeError("scatter_list is a required argument in scatter source") return torch._C._dist_scatter_send(scatter_list, tensor, _group) else: if scatter_list: raise RuntimeError("non-empty can be given only to scatter source") return torch._C._dist_scatter_recv(tensor, src, _group)
[docs]def barrier(group=group.WORLD): r"""Synchronizes all processes. This collective blocks processes until the whole group enters this function. Arguments: group (optional): Group of the collective. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" return torch._C._dist_barrier(group)
[docs]def new_group(ranks=None): r"""Creates a new distributed group. This function requires that all processes in the main group (i.e., all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes. Arguments: ranks (list[int]): List of ranks of group members. Returns: A handle of distributed group that can be given to collective calls. """ assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \ "collective only supported in process-group mode" if ranks is None: ranks = list(range(get_world_size())) return torch._C._dist_new_group(ranks)
def _clear_group_cache(group=group.WORLD): r"""Clear the created distributed group's cached resource. Only NCCL backend is currently supported. Cached resource includes NCCL communicators and CUDA events. Arguments: group (optional): Group of the collective. """ return torch._C._dist_clear_group_cache(group) def _register_stream(stream): if not _initialized: raise RuntimeError("torch.distributed.deprecated needs to be initialized first") return torch._C._dist_register_stream(stream)

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources