|  | """ | 
|  | torch.distributed provides an MPI-like interface for exchanging tensor | 
|  | data across multi-machine networks. It supports a few different backends | 
|  | and initialization methods. | 
|  | """ | 
|  | import torch | 
|  | import atexit | 
|  | import warnings | 
|  | from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors | 
|  |  | 
|  |  | 
|  | class dist_backend: | 
|  | UNDEFINED = -1 | 
|  | TCP = 0 | 
|  | MPI = 1 | 
|  | GLOO = 2 | 
|  | NCCL = 3 | 
|  |  | 
|  |  | 
|  | _INITIALIZED_PG = 1 | 
|  | _INITIALIZED_MW = 2 | 
|  | _initialized = 0 | 
|  | _backend = dist_backend.UNDEFINED | 
|  | _scope = locals() | 
|  |  | 
|  |  | 
|  | def _extend_scope(module): | 
|  | _scope.update({k: getattr(module, k) for k in dir(module) if not k.startswith('_')}) | 
|  |  | 
|  |  | 
|  | def is_available(): | 
|  | return torch._C._has_distributed() | 
|  |  | 
|  |  | 
|  | def destroy_process_group(): | 
|  | """ | 
|  | Destroy the initialized distributed package | 
|  | """ | 
|  | global _backend | 
|  | global _initialized | 
|  | torch._C._dist_destroy_process_group() | 
|  | _backend = dist_backend.UNDEFINED | 
|  | _initialized = 0 | 
|  |  | 
|  |  | 
|  | def is_initialized(): | 
|  | """Checking if the process group has been initialized | 
|  | """ | 
|  | return _initialized == _INITIALIZED_PG | 
|  |  | 
|  |  | 
|  | def init_process_group(backend, init_method='env://', **kwargs): | 
|  | """Initializes the distributed package. | 
|  |  | 
|  | Arguments: | 
|  | backend (str): Name of the backend to use. Depending on build-time configuration | 
|  | valid values include: ``tcp``, ``mpi`` and ``gloo``. | 
|  | init_method (str, optional): URL specifying how to initialize the package. | 
|  | world_size (int, optional): Number of processes participating in the job. | 
|  | rank (int, optional): Rank of the current process. | 
|  | group_name (str, optional): Group name. See description of init methods. | 
|  |  | 
|  | To enable ``backend == mpi``, PyTorch needs to built from source on a system that | 
|  | supports MPI. | 
|  |  | 
|  | """ | 
|  | world_size = kwargs.pop('world_size', -1) | 
|  | group_name = kwargs.pop('group_name', '') | 
|  | rank = kwargs.pop('rank', -1) | 
|  | assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys()) | 
|  |  | 
|  | if not is_available(): | 
|  | raise RuntimeError("PyTorch built without distributed support") | 
|  |  | 
|  | global _initialized | 
|  | if _initialized: | 
|  | raise RuntimeError("trying to initialize torch.distributed twice!") | 
|  |  | 
|  | # Checking and assigning the distributed backend | 
|  | global _backend | 
|  |  | 
|  | if backend == "tcp": | 
|  | _backend = dist_backend.TCP | 
|  | elif backend == "mpi": | 
|  | _backend = dist_backend.MPI | 
|  | elif backend == "gloo": | 
|  | _backend = dist_backend.GLOO | 
|  | elif backend == "nccl": | 
|  | _backend = dist_backend.NCCL | 
|  | else: | 
|  | raise RuntimeError("Invalid distributed backend name: " + backend) | 
|  |  | 
|  | torch._C._dist_init_process_group(backend, init_method, world_size, | 
|  | group_name, rank) | 
|  | _initialized = _INITIALIZED_PG | 
|  |  | 
|  | if _backend == dist_backend.NCCL: | 
|  | atexit.register(destroy_process_group) | 
|  |  | 
|  | if not torch._C._dist_init_extension(False, reduce_op, group): | 
|  | raise RuntimeError("distributed module initialization failed") | 
|  |  | 
|  |  | 
|  | def init_master_worker(backend, init_method='env://', **kwargs): | 
|  | warnings.warn(""" | 
|  | ================================================================================ | 
|  | WARNING | 
|  | ================================================================================ | 
|  | Master-worker mode is still experimental. The API will change without | 
|  | notice and we're can't guarantee full correctness and expected performance yet. | 
|  | We'll announce it once it's ready. | 
|  | """) | 
|  | world_size = kwargs.pop('world_size', -1) | 
|  | group_name = kwargs.pop('group_name', '') | 
|  | rank = kwargs.pop('rank', -1) | 
|  | assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys()) | 
|  |  | 
|  | if not is_available(): | 
|  | raise RuntimeError("PyTorch built without distributed support") | 
|  |  | 
|  | global _initialized | 
|  | if _initialized: | 
|  | raise RuntimeError("trying to initialize torch.distributed twice!") | 
|  | torch._C._dist_init_master_worker(backend, init_method, world_size, | 
|  | group_name, rank) | 
|  | _initialized = _INITIALIZED_MW | 
|  | import torch.distributed.collectives as collectives | 
|  | import torch.distributed.remote_types as remote_types | 
|  | _extend_scope(collectives) | 
|  | _extend_scope(remote_types) | 
|  | if not torch._C._dist_init_extension(True, reduce_op, group): | 
|  | raise RuntimeError("distributed module initialization failed") | 
|  |  | 
|  |  | 
|  | class reduce_op(object): | 
|  | SUM = object() | 
|  | PRODUCT = object() | 
|  | MAX = object() | 
|  | MIN = object() | 
|  |  | 
|  |  | 
|  | class group(object): | 
|  | WORLD = object() | 
|  |  | 
|  |  | 
|  | class _DistributedRequest(object): | 
|  | def __init__(self, request): | 
|  | self.request = request | 
|  |  | 
|  | def is_completed(self): | 
|  | return torch._C._dist_request_is_completed(self.request) | 
|  |  | 
|  | def wait(self): | 
|  | torch._C._dist_request_wait(self.request) | 
|  |  | 
|  |  | 
|  | def get_rank(): | 
|  | """Returns the rank of current process. | 
|  |  | 
|  | Rank is a unique identifier assigned to each process within a distributed | 
|  | group. They are always consecutive integers ranging from 0 to ``world_size``. | 
|  | """ | 
|  | assert torch.distributed._initialized | 
|  | return torch._C._dist_get_rank() | 
|  |  | 
|  |  | 
|  | def get_world_size(): | 
|  | """Returns the number of processes in the distributed group.""" | 
|  | assert torch.distributed._initialized | 
|  | return torch._C._dist_get_num_processes() | 
|  |  | 
|  |  | 
|  | def isend(tensor, dst): | 
|  | """Sends a tensor asynchronously. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Tensor to send. | 
|  | dst (int): Destination rank. | 
|  |  | 
|  | Returns: | 
|  | A distributed request object. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return _DistributedRequest(torch._C._dist_isend(tensor, dst)) | 
|  |  | 
|  |  | 
|  | def irecv(tensor, src): | 
|  | """Receives a tensor asynchronously. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Tensor to fill with received data. | 
|  | src (int): Source rank. | 
|  |  | 
|  | Returns: | 
|  | A distributed request object. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return _DistributedRequest(torch._C._dist_irecv(tensor, src)) | 
|  |  | 
|  |  | 
|  | def send(tensor, dst): | 
|  | """Sends a tensor synchronously. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Tensor to send. | 
|  | dst (int): Destination rank. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return torch._C._dist_send(tensor, dst) | 
|  |  | 
|  |  | 
|  | def recv(tensor, src=None): | 
|  | """Receives a tensor synchronously. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Tensor to fill with received data. | 
|  | src (int, optional): Source rank. Will receive from any | 
|  | process if unspecified. | 
|  |  | 
|  | Returns: | 
|  | Sender rank. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | if src is None: | 
|  | return torch._C._dist_recv_any_source(tensor) | 
|  | return torch._C._dist_recv(tensor, src) | 
|  |  | 
|  |  | 
|  | def broadcast_multigpu(tensor_list, src, group=group.WORLD): | 
|  | """Broadcasts the tensor to the whole group with multiple GPU tensors | 
|  | per node. | 
|  |  | 
|  | ``tensor`` must have the same number of elements in all the GPUs from | 
|  | all processes participating in the collective. each tensor in the list must | 
|  | be on a different GPU | 
|  |  | 
|  | Only nccl backend is currently supported | 
|  | tensors should only be GPU tensors | 
|  |  | 
|  | Arguments: | 
|  | tensor_list (List[Tensor]): Tensors that participate in the collective | 
|  | operation. if ``src`` is the rank, then the first element of | 
|  | tensor_list (tensor_list[0]) will be broadcasted to all other | 
|  | tensors (on different GPUs) in the src process and all tensors in | 
|  | tensor_list of other non-src processes. | 
|  |  | 
|  | You also need to make sure that len(tensor_list) is the same for | 
|  | all the distributed processes calling this function. | 
|  |  | 
|  | src (int): Source rank. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  |  | 
|  | return torch._C._dist_broadcast_multigpu(tensor_list, src, group) | 
|  |  | 
|  |  | 
|  | def broadcast(tensor, src, group=group.WORLD): | 
|  | """Broadcasts the tensor to the whole group. | 
|  |  | 
|  | ``tensor`` must have the same number of elements in all processes | 
|  | participating in the collective. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Data to be sent if ``src`` is the rank of current | 
|  | process, and tensor to be used to save received data otherwise. | 
|  | src (int): Source rank. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return torch._C._dist_broadcast(tensor, src, group) | 
|  |  | 
|  |  | 
|  | def all_reduce_multigpu(tensor_list, op=reduce_op.SUM, group=group.WORLD): | 
|  | """Reduces the tensor data across all machines in such a way that all get | 
|  | the final result. This function reduces a number of tensors on every node, | 
|  | while each tensor resides on different GPUs | 
|  | Therefore, the input tensor in the tensor list needs to be GPU tensors. | 
|  | Also, each tensor in the tensor list needs to reside on a different GPU. | 
|  |  | 
|  | After the call, all ``tensor``s in the tensor list  is going to be bitwise | 
|  | identical in all processes. | 
|  |  | 
|  | Only nccl backend is currently supported | 
|  | tensors should only be GPU tensors | 
|  |  | 
|  | Arguments: | 
|  | tensor list (List[Tensor]): List of input and output tensors of | 
|  | the collective. The function operates in-place and requires that | 
|  | each tensor to be a GPU tensor on different GPUs. | 
|  |  | 
|  | You also need to make sure that len(tensor_list) is the same for | 
|  | all the distributed processes calling this function. | 
|  |  | 
|  | op (optional): One of the values from ``torch.distributed.reduce_op`` | 
|  | enum.  Specifies an operation used for element-wise reductions. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  |  | 
|  | return torch._C._dist_all_reduce_multigpu(tensor_list, op, group) | 
|  |  | 
|  |  | 
|  | def all_reduce(tensor, op=reduce_op.SUM, group=group.WORLD): | 
|  | """Reduces the tensor data across all machines in such a way that all get | 
|  | the final result. | 
|  |  | 
|  | After the call ``tensor`` is going to be bitwise identical in all processes. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Input and output of the collective. The function | 
|  | operates in-place. | 
|  | op (optional): One of the values from ``torch.distributed.reduce_op`` | 
|  | enum.  Specifies an operation used for element-wise reductions. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return torch._C._dist_all_reduce(tensor, op, group) | 
|  |  | 
|  |  | 
|  | def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD): | 
|  | """Reduces the tensor data on multiple GPUs across all machines. Each tensor | 
|  | in tensor_list should reside on a separate GPU | 
|  |  | 
|  | Only the GPU of tensor_list[0] on the process with rank ``dst`` is | 
|  | going to receive the final result. | 
|  |  | 
|  | Only nccl backend is currently supported | 
|  | tensors should only be GPU tensors | 
|  |  | 
|  | Arguments: | 
|  | tensor_list (List[Tensor]): Input and output GPU tensors of the | 
|  | collective . The function operates in-place. | 
|  |  | 
|  | You also need to make sure that len(tensor_list) is the same for | 
|  | all the distributed processes calling this function. | 
|  |  | 
|  | dst (int): Destination rank | 
|  | op (optional): One of the values from ``torch.distributed.reduce_op`` | 
|  | enum.  Specifies an operation used for element-wise reductions. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  |  | 
|  | return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group) | 
|  |  | 
|  |  | 
|  | def reduce(tensor, dst, op=reduce_op.SUM, group=group.WORLD): | 
|  | """Reduces the tensor data across all machines. | 
|  |  | 
|  | Only the process with rank ``dst`` is going to receive the final result. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Input and output of the collective. The function | 
|  | operates in-place. | 
|  | dst (int): Destination rank | 
|  | op (optional): One of the values from ``torch.distributed.reduce_op`` | 
|  | enum.  Specifies an operation used for element-wise reductions. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return torch._C._dist_reduce(tensor, dst, op, group) | 
|  |  | 
|  |  | 
|  | def all_gather_multigpu(output_tensor_lists, | 
|  | input_tensor_list, | 
|  | group=group.WORLD): | 
|  | """Gathers tensors from the whole group in a list. | 
|  | Each tensor in tensor_list should reside on a separate GPU | 
|  |  | 
|  | Only nccl backend is currently supported | 
|  | tensors should only be GPU tensors | 
|  |  | 
|  | Arguments: | 
|  | output_tensor_lists (List[List[Tensor]]): Output lists. It should | 
|  | contain correctly-sized tensors on each GPU to be used for output of | 
|  | the collective. | 
|  |  | 
|  | e.g. output_tensor_lists[i] contains the all_gather | 
|  | result that resides on the GPU of input_tensor_list[i]. | 
|  |  | 
|  | Note that each element of output_tensor_lists[i] has the size of | 
|  | world_size * len(input_tensor_list), since the function all gathers | 
|  | the result from every single GPU in the group. To interpret each | 
|  | element of output_tensor_list[i], note that input_tensor_list[j] of | 
|  | rank k will be appear in | 
|  | output_tensor_list[i][rank * world_size + j] | 
|  |  | 
|  | Also note that len(output_tensor_lists), and the size of each | 
|  | element in output_tensor_lists (each element is a list, | 
|  | therefore len(output_tensor_lists[i])), need to be the same | 
|  | for all the distributed processes calling this function. | 
|  |  | 
|  | input_tensor_list (List[Tensor]): List of tensors(on different GPUs) to | 
|  | be broadcast from current process. | 
|  | Note that len(input_tensor_list) needs to be the same for | 
|  | all the distributed processes calling this function. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  |  | 
|  | flatten_tensor_list = [] | 
|  | for output_tensor_list in output_tensor_lists: | 
|  | flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list)) | 
|  |  | 
|  | ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list, | 
|  | input_tensor_list, | 
|  | group) | 
|  |  | 
|  | for output_tensor_list, flatten_tensor in zip(output_tensor_lists, | 
|  | flatten_tensor_list): | 
|  | for tensor, value in zip(output_tensor_list, | 
|  | _unflatten_dense_tensors(flatten_tensor, | 
|  | output_tensor_list)): | 
|  | tensor.copy_(value) | 
|  |  | 
|  | return ret | 
|  |  | 
|  |  | 
|  | def all_gather(tensor_list, tensor, group=group.WORLD): | 
|  | """Gathers tensors from the whole group in a list. | 
|  |  | 
|  | Arguments: | 
|  | tensor_list (list[Tensor]): Output list. It should contain | 
|  | correctly-sized tensors to be used for output of the collective. | 
|  | tensor (Tensor): Tensor to be broadcast from current process. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | if _backend != dist_backend.NCCL: | 
|  | return torch._C._dist_all_gather(tensor_list, tensor, group) | 
|  | else: | 
|  | return all_gather_multigpu([tensor_list], [tensor], group) | 
|  |  | 
|  |  | 
|  | def gather(tensor, **kwargs): | 
|  | """Gathers a list of tensors in a single process. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Input tensor. | 
|  | dst (int): Destination rank. Required in all processes except the one that | 
|  | is receiveing the data. | 
|  | gather_list (list[Tensor]): List of appropriately-sized tensors to | 
|  | use for received data. Required only in the receiving process. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | my_rank = get_rank() | 
|  | dst = kwargs.pop('dst', my_rank) | 
|  | gather_list = kwargs.pop('gather_list', None) | 
|  | _group = kwargs.pop('group', group.WORLD) | 
|  | if kwargs: | 
|  | raise RuntimeError("got unexpected kwargs") | 
|  | if dst == my_rank: | 
|  | if gather_list is None: | 
|  | raise RuntimeError("gather_list is a required argument in gather destination") | 
|  | return torch._C._dist_gather_recv(gather_list, tensor, _group) | 
|  | else: | 
|  | if gather_list: | 
|  | raise RuntimeError("non-empty gather_list can be given only to gather destination") | 
|  | return torch._C._dist_gather_send(tensor, dst, _group) | 
|  |  | 
|  |  | 
|  | def scatter(tensor, **kwargs): | 
|  | """Scatters a list of tensors to all processes in a group. | 
|  |  | 
|  | Each process will receive exactly one tensor and store its data in the | 
|  | ``tensor`` argument. | 
|  |  | 
|  | Arguments: | 
|  | tensor (Tensor): Output tensor. | 
|  | src (int): Source rank. Required in all processes except the one that | 
|  | is sending the data. | 
|  | scatter_list (list[Tensor]): List of tensors to scatter. Required only | 
|  | in the process that is sending the data. | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | my_rank = get_rank() | 
|  | src = kwargs.pop('src', my_rank) | 
|  | scatter_list = kwargs.pop('scatter_list', None) | 
|  | _group = kwargs.pop('group', group.WORLD) | 
|  | if kwargs: | 
|  | raise RuntimeError("got unexpected kwargs") | 
|  | if src == my_rank: | 
|  | if scatter_list is None: | 
|  | raise RuntimeError("scatter_list is a required argument in scatter source") | 
|  | return torch._C._dist_scatter_send(scatter_list, tensor, _group) | 
|  | else: | 
|  | if scatter_list: | 
|  | raise RuntimeError("non-empty can be given only to scatter source") | 
|  | return torch._C._dist_scatter_recv(tensor, src, _group) | 
|  |  | 
|  |  | 
|  | def barrier(group=group.WORLD): | 
|  | """Synchronizes all processes. | 
|  |  | 
|  | This collective blocks processes until the whole group enters this function. | 
|  |  | 
|  | Arguments: | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | return torch._C._dist_barrier(group) | 
|  |  | 
|  |  | 
|  | def new_group(ranks=None): | 
|  | """Creates a new distributed group. | 
|  |  | 
|  | This function requires that all processes in the main group (i.e. all | 
|  | processes that are part of the distributed job) enter this function, even | 
|  | if they are not going to be members of the group. Additionally, groups | 
|  | should be created in the same order in all processes. | 
|  |  | 
|  | Arguments: | 
|  | ranks (list[int]): List of ranks of group members. | 
|  |  | 
|  | Returns: | 
|  | A handle of distributed group that can be given to collective calls. | 
|  | """ | 
|  | assert torch.distributed._initialized == _INITIALIZED_PG, \ | 
|  | "collective only supported in process-group mode" | 
|  | if ranks is None: | 
|  | ranks = list(range(get_world_size())) | 
|  | return torch._C._dist_new_group(ranks) | 
|  |  | 
|  |  | 
|  | def _clear_group_cache(group=group.WORLD): | 
|  | """Clear the created distributed group's cached resource | 
|  |  | 
|  | Only nccl backend is currently supported | 
|  |  | 
|  | Cached resource includes NCCL communicators and CUDA events | 
|  |  | 
|  | Arguments: | 
|  | group (optional): Group of the collective. | 
|  | """ | 
|  | return torch._C._dist_clear_group_cache(group) | 
|  |  | 
|  |  | 
|  | def _register_stream(stream): | 
|  | if not _initialized: | 
|  | raise RuntimeError("torch.distributed needs to be initialized first") | 
|  | return torch._C._dist_register_stream(stream) |