Revert "[2/N] Add flag to control which rank should perform NaN check (#134345)"
This reverts commit be7752ead3824e79f5ede6a2f59715b415a2f776.
Reverted https://github.com/pytorch/pytorch/pull/134345 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally ([comment](https://github.com/pytorch/pytorch/pull/134345#issuecomment-2316133024))
diff --git a/test/distributed/test_c10d_nccl.py b/test/distributed/test_c10d_nccl.py
index 55b785a..64c51f7 100644
--- a/test/distributed/test_c10d_nccl.py
+++ b/test/distributed/test_c10d_nccl.py
@@ -367,28 +367,6 @@
os.environ["TORCH_NCCL_NAN_CHECK"] = "0"
@requires_nccl()
- @skip_if_lt_x_gpu(2)
- def test_nan_p2p(self):
- # Putting NaN at recv buffer, program should not fail as NaN checker
- # should not check on receive buffer
- os.environ["TORCH_NCCL_NAN_CHECK"] = "1"
- store = c10d.FileStore(self.file_name, self.world_size)
- device = torch.device("cuda:%d" % self.rank)
- c10d.init_process_group(
- backend="nccl", store=store, rank=self.rank, world_size=self.world_size
- )
- t = torch.ones(3, 4, dtype=torch.bfloat16, device=device)
- if self.rank == 0:
- c10d.send(t, 1)
- elif self.rank == 1:
- # Putting NaN at recv buffer
- t[1, 1] = float("nan")
- c10d.recv(t, 0)
- c10d.destroy_process_group()
- # reset env
- os.environ["TORCH_NCCL_NAN_CHECK"] = "0"
-
- @requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_destruct_before_terminate_pg(self):
# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically
diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
index 291eb79..abcf493 100644
--- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
+++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
@@ -2637,12 +2637,9 @@
PostProcess post,
OpType opType,
const char* profilingTitle,
- bool avoidRecordStreams,
- bool nanCheck) {
+ bool avoidRecordStreams) {
// Environment setting by the user may add onto collective call's option
avoidRecordStreams |= avoidRecordStreams_;
- nanCheck &= enableNanCheck_;
-
c10::cuda::CaptureStatus capture_status =
c10::cuda::currentStreamCaptureStatusMayInitCtx();
errorIfCapturingNonCapturableNCCL(capture_status);
@@ -2696,7 +2693,7 @@
at::cuda::OptionalCUDAGuard gpuGuard;
- if (nanCheck) {
+ if (enableNanCheck_) {
checkForNan(input, ncclStream);
}
@@ -3129,9 +3126,7 @@
// is gpuGuard needed for the if block below, or can i swap them
at::cuda::OptionalCUDAGuard gpuGuard;
- // Only check for NaN for send ops, for recv ops `tensor` can be a random
- // placeholder
- if (enableNanCheck_ && opType == OpType::SEND) {
+ if (enableNanCheck_) {
checkForNan(tensor, ncclStream);
}
@@ -3228,8 +3223,7 @@
Fn fn,
OpType opType,
const char* profilingTitle,
- bool avoidRecordStreams,
- bool nanCheck) {
+ bool avoidRecordStreams) {
return collective(
input,
output,
@@ -3240,8 +3234,7 @@
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL>& work) {},
opType,
profilingTitle,
- avoidRecordStreams,
- nanCheck);
+ avoidRecordStreams);
}
template <typename Fn>
@@ -3491,9 +3484,6 @@
// avoidRecordStreams_ note: collective() will stash tensors.
bool avoidRecordStreams = avoidRecordStreams_ || (!opts.asyncOp);
- const auto root = opts.rootRank + opts.rootTensor;
- bool nanCheck = (root == rank_);
-
return collective(
tensor,
tensor,
@@ -3501,6 +3491,7 @@
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
+ const auto root = opts.rootRank + opts.rootTensor;
return ncclBcast(
input.data_ptr(),
input.numel(),
@@ -3511,8 +3502,7 @@
},
OpType::BROADCAST,
"nccl:broadcast",
- avoidRecordStreams,
- nanCheck);
+ avoidRecordStreams);
}
// _broadcast_oop adds an out-of-place broadcast in PGNCCL
@@ -3532,9 +3522,6 @@
"Tensor input and output of _broadcast_oop must have the same number of elements ");
}
- const auto root = opts.rootRank + opts.rootTensor;
- bool nanCheck = (root == rank_);
-
return collective(
inputTensor,
outputTensor,
@@ -3542,6 +3529,7 @@
at::Tensor& output,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
+ const auto root = opts.rootRank + opts.rootTensor;
return ncclBroadcast(
input.data_ptr(),
output.data_ptr(),
@@ -3552,9 +3540,7 @@
stream.stream());
},
OpType::BROADCAST,
- "nccl:_broadcast_oop",
- /*avoidRecordStreams=*/false,
- nanCheck);
+ "nccl:_broadcast_oop");
}
c10::intrusive_ptr<Work> ProcessGroupNCCL::reduce(
@@ -4505,9 +4491,6 @@
// inputs, which == inputTensors[0] on the root rank where it matters.
bool avoidRecordStreams = avoidRecordStreams_ || (!opts.asyncOp);
- const auto root = opts.rootRank;
- bool nanCheck = (rank_ == root);
-
return collective(
outputTensor,
inputs[0], // just to fit the collective interface
@@ -4515,6 +4498,7 @@
at::Tensor& /* unused */,
ncclComm_t comm,
at::cuda::CUDAStream& stream) {
+ const auto root = opts.rootRank;
if (getRank() == root) {
if (!avoidRecordStreams) {
for (auto input : inputs) {
@@ -4528,8 +4512,7 @@
},
OpType::SCATTER,
"nccl:scatter",
- avoidRecordStreams,
- nanCheck);
+ avoidRecordStreams);
}
c10::intrusive_ptr<Work> ProcessGroupNCCL::recvAnysource(
diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
index b663515..2ba68cd 100644
--- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
+++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
@@ -762,8 +762,7 @@
Fn fn,
OpType opType,
const char* profilingTitle = nullptr,
- bool avoidRecordStreams = false,
- bool nanCheck = true);
+ bool avoidRecordStreams = false);
template <typename Fn, typename PreProcess, typename PostProcess>
c10::intrusive_ptr<Work> collective(
@@ -774,8 +773,7 @@
PostProcess post,
OpType opType,
const char* profilingTitle = nullptr,
- bool avoidRecordStreams = false,
- bool nanCheck = true);
+ bool avoidRecordStreams = false);
template <typename Fn>
c10::intrusive_ptr<Work> collectiveCoalesced(