Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed collective op broadcast/allgather/reduce_scatter/barrier #1202

Merged
merged 31 commits into from
Jan 8, 2025

Conversation

Chao1Han
Copy link
Contributor

@Chao1Han Chao1Han commented Dec 23, 2024

Motivation:

implement collectives op broadcast, allreduce_coalesced, allgather, _allgather_base, allgather_coalesced, allgather_into_tensor_coalesced, reduce_scatter, _reduce_scatter_base, reduce_scatter_tensor_coalesced, barrier.

},
[](at::xpu::XPUStream&,
c10::intrusive_ptr<ProcessGroupXCCL::WorkXCCL>&) {
ccl::group_end();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think groupStart/groupEnd wraps ccl::group_start/ccl::group_end. Then should you call the wrapped API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xcclActiveGroupCounter_ affect batchP2P choice. lets use origin api like nccl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment here

return true;
}

void check_xpu_single_tensor(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you follow the same naming format like checkSingleTensor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified

}
}
}

int64_t check_xpu_tensors_same_device(const std::vector<at::Tensor>& tensors) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you follow the same naming format like checkTensorOnSameDevice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified

@@ -62,6 +109,10 @@ ccl::reduction getXcclReduceOp(const ReduceOp& reduceOp, at::Tensor& input) {
// Map sum to max for bool tensors to avoid overflow issues with sum.
return ccl::reduction::max;
}
// WA due to oneCCL not support AVG
if (reduceOp == ReduceOp::AVG) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WA does not mean simply replacing avg with sum, but using sum collective and div SYCL kernel to simulate avg. Please update your comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add comments that oneCCL is expected to support avg in basekit 2025.2 release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -31,22 +31,69 @@ const std::map<at::ScalarType, ccl::datatype> xcclDatatypes = {
{at::kFloat8_e5m2fnuz, ccl::datatype::uint8},
};

void checkXPUTensor(at::Tensor& tensor) {
bool check_same_size(const std::vector<at::Tensor>& input_tensors) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refine the API name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Base automatically changed from chao/xccl to main January 7, 2025 01:47
TORCH_CHECK(
!isFloat8Type(type) && is_reduction_op,
"Float8 dtypes are not currenlty supported for XCCL reductions");
if (is_reduction_op)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to change the check format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the logical error for non-reduction operations; the previous implementation blocked all non-reduction operations.

std::make_shared<xcclComm_t>(std::move(comms[0]));
XCCLComm = std::make_shared<xcclComm_t>(std::move(comms[0]));

RECORD_PARAM_COMMS(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this logic here to record params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ccl comm create should also record like https://github.com/pytorch/pytorch/blob/168c2cb3f3211e5fc2110b5f1e982793a04437a4/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp#L2627, and seq=0 means first init ccl comm in standalone collective.

@Chao1Han Chao1Han changed the title [wip] group op Add distributed collective op broadcast/allgather/reduce_scatter/barrier Jan 7, 2025
@zhangxiaoli73 zhangxiaoli73 added this pull request to the merge queue Jan 8, 2025
Merged via the queue into main with commit af8622f Jan 8, 2025
3 of 4 checks passed
@zhangxiaoli73 zhangxiaoli73 deleted the chao/xccl2 branch January 8, 2025 05:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants