Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] [1/N] Ray syncer set observer #49122

Merged
merged 13 commits into from
Dec 16, 2024

Conversation

dentiny
Copy link
Contributor

@dentiny dentiny commented Dec 6, 2024

Resolves issue: #48837

This PR is the first part for reducing communication between GCS/raylet.
The functionality is:

  • Provide an observer callable to ray syncer and node state
  • Every time bidirectional grpc is completed, invoke the registered callback
  • As for the callback, I plan to pass in functor to update <node id, updated timestamp> which is stored in health check manager (in next PR)

@dentiny dentiny requested a review from a team as a code owner December 6, 2024 05:02
@dentiny dentiny force-pushed the hjiang/ray-syncer-set-observer branch 2 times, most recently from 137a067 to 637ff45 Compare December 6, 2024 05:29
@dentiny dentiny force-pushed the hjiang/ray-syncer-set-observer branch from 637ff45 to 5f88358 Compare December 6, 2024 05:30
@dentiny dentiny changed the title Hjiang/ray syncer set observer [core] Ray syncer set observer Dec 6, 2024
@dentiny dentiny changed the title [core] Ray syncer set observer [core] [1/N] Ray syncer set observer Dec 6, 2024
@dentiny dentiny requested review from jjyao and rynewang December 6, 2024 05:33
@dentiny dentiny added the go add ONLY when ready to merge, run all tests label Dec 6, 2024
@dentiny
Copy link
Contributor Author

dentiny commented Dec 11, 2024

Hi @jjyao , could you please help review this PR? Thank you!

Comment on lines 30 to 31
using RaySyncMsgObserver =
std::function<void(const ::ray::rpc::syncer::RaySyncMessage &)>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead define a generic

// A callback is called whenver a rpc completes between the current process and the given raylet regardless of whether the response indicates success or failure
using RayletCompletedRpcCallback(void(const NodeID &))

and this callback can be set on any components that have rpcs with raylet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regardless of whether the response indicates success or failure

I'm not sure this is correct. For example, if you get an error status on grpc (i.e. UNAVAILABLE for grpc status), it actually means communication between gcs and raylet is broken, which we shouldn't record completion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here Completed means client sends the request and then receives the response so if this is UNAVAILABLE, it's not completed. This is the naming suggested by chatgpt lol.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can think Completed == GrpcStatus::OK

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But feel free to give a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed Raylet from the comment and type alias, the whole RaySyncer is pretty general-purpose, which doesn't couple with any component in terms of implementation.

src/ray/common/ray_syncer/ray_syncer.h Outdated Show resolved Hide resolved
Comment on lines 76 to 78
if (on_raylet_rpc_completion_) {
on_raylet_rpc_completion_(NodeID::FromBinary(message->node_id()));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best place to trigger the callback is RaySyncerBidiReactorBase::OnReadDone() because NodeState::ConsumeSyncMessage can be called in different settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I think it's good to update right after a successful grpc call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I recall why I put it in the NodeState at the first place, reactors are stored per-node as a map inside of RaySyncer:

/// Manage connections. Here the key is the NodeID in binary form.
absl::flat_hash_map<std::string, RaySyncerBidiReactor *> sync_reactors_;
/// The local node state
std::unique_ptr<NodeState> node_state_;

which means if you tie the logic grpc callback, you have to store it somewhere and pass it to every reactor construction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation updated.

@dentiny dentiny force-pushed the hjiang/ray-syncer-set-observer branch from 4c8bb2c to 9a9fd9e Compare December 12, 2024 09:45
@dentiny dentiny requested a review from jjyao December 12, 2024 09:45
src/ray/common/ray_syncer/common.h Outdated Show resolved Hide resolved
src/ray/common/ray_syncer/ray_syncer.cc Outdated Show resolved Hide resolved
@dentiny dentiny requested a review from jjyao December 12, 2024 19:06
@dentiny dentiny requested a review from jjyao December 12, 2024 21:24
Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG

@@ -93,11 +94,24 @@ class RaySyncerBidiReactor {
}
};

/// Set rpc completion callback, which is called after rpc read finishes.
/// This function is expected to call only once, repeated invocations throws exception.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This function is expected to call only once, repeated invocations throws exception.
/// This function is expected to call only once, repeated invocations will check fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why "will check fail" is a better wording?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway I updated the wording, but would like to hear the reasoning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check fail will just exit the process so no c++ exception is thrown?

src/ray/common/ray_syncer/common.h Outdated Show resolved Hide resolved
Signed-off-by: dentiny <[email protected]>
Signed-off-by: dentiny <[email protected]>
@dentiny dentiny requested a review from jjyao December 16, 2024 07:31
@jjyao jjyao merged commit 9f51eb5 into ray-project:master Dec 16, 2024
5 checks passed
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants