Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix gcs server using shared pointer #48888

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2403,11 +2403,43 @@ ray_cc_test(
)

ray_cc_test(
name = "gcs_export_event_test",
name = "gcs_job_manager_export_event_test",
size = "small",
srcs = glob([
"src/ray/gcs/gcs_server/test/export_api/*.cc",
]),
srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc"],
tags = [
"no_windows",
"team:core"
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "gcs_actor_manager_export_event_test",
size = "small",
srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc"],
tags = [
"no_windows",
"team:core"
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "gcs_node_manager_export_event_test",
size = "small",
srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc"],
tags = [
"no_windows",
"team:core"
Expand Down
6 changes: 5 additions & 1 deletion src/mock/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ namespace gcs {

class MockGcsNodeManager : public GcsNodeManager {
public:
MockGcsNodeManager() : GcsNodeManager(nullptr, nullptr, nullptr, ClusterID::Nil()) {}
MockGcsNodeManager()
: GcsNodeManager(/*gcs_publisher=*/nullptr,
/*gcs_table_storage=*/nullptr,
/*raylet_client_pool=*/nullptr,
/*cluster_id=*/ClusterID::Nil()) {}
MOCK_METHOD(void,
HandleRegisterNode,
(rpc::RegisterNodeRequest request,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::CoreWorkerClientFactoryFn &worker_client_factory = nullptr);

~GcsActorManager() = default;
~GcsActorManager() override = default;

void HandleRegisterActor(rpc::RegisterActorRequest request,
rpc::RegisterActorReply *reply,
Expand Down
31 changes: 15 additions & 16 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ GcsActorScheduler::GcsActorScheduler(
instrumented_io_context &io_context,
GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<ClusterTaskManager> cluster_task_manager,
ClusterTaskManager &cluster_task_manager,
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::NodeManagerClientPool &raylet_client_pool,
rpc::CoreWorkerClientFactoryFn client_factory,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback)
: io_context_(io_context),
gcs_actor_table_(gcs_actor_table),
gcs_node_manager_(gcs_node_manager),
cluster_task_manager_(std::move(cluster_task_manager)),
cluster_task_manager_(cluster_task_manager),
schedule_failure_handler_(std::move(schedule_failure_handler)),
schedule_success_handler_(std::move(schedule_success_handler)),
raylet_client_pool_(raylet_client_pool),
Expand Down Expand Up @@ -97,11 +97,11 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr<GcsActor> actor) {
const auto &owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID());
RayTask task(actor->GetCreationTaskSpecification(),
owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string());
cluster_task_manager_->QueueAndScheduleTask(task,
/*grant_or_reject*/ false,
/*is_selected_based_on_locality*/ false,
/*reply*/ reply.get(),
send_reply_callback);
cluster_task_manager_.QueueAndScheduleTask(task,
/*grant_or_reject*/ false,
/*is_selected_based_on_locality*/ false,
/*reply*/ reply.get(),
send_reply_callback);
}

void GcsActorScheduler::ScheduleByRaylet(std::shared_ptr<GcsActor> actor) {
Expand Down Expand Up @@ -218,7 +218,7 @@ std::vector<ActorID> GcsActorScheduler::CancelOnNode(const NodeID &node_id) {
}
}

raylet_client_pool_->Disconnect(node_id);
raylet_client_pool_.Disconnect(node_id);

return actor_ids;
}
Expand Down Expand Up @@ -531,7 +531,7 @@ void GcsActorScheduler::DoRetryCreatingActorOnWorker(

std::shared_ptr<WorkerLeaseInterface> GcsActorScheduler::GetOrConnectLeaseClient(
const rpc::Address &raylet_address) {
return raylet_client_pool_->GetOrConnectByAddress(raylet_address);
return raylet_client_pool_.GetOrConnectByAddress(raylet_address);
}

bool GcsActorScheduler::KillActorOnWorker(const rpc::Address &worker_address,
Expand Down Expand Up @@ -664,28 +664,27 @@ void GcsActorScheduler::HandleWorkerLeaseRejectedReply(
void GcsActorScheduler::OnActorDestruction(std::shared_ptr<GcsActor> actor) {
if (!actor->GetAcquiredResources().IsEmpty()) {
ReturnActorAcquiredResources(actor);
cluster_task_manager_->ScheduleAndDispatchTasks();
cluster_task_manager_.ScheduleAndDispatchTasks();
}
}

void GcsActorScheduler::ReturnActorAcquiredResources(std::shared_ptr<GcsActor> actor) {
auto &cluster_resource_manager =
cluster_task_manager_->GetClusterResourceScheduler()->GetClusterResourceManager();
cluster_task_manager_.GetClusterResourceScheduler()->GetClusterResourceManager();
cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(actor->GetNodeID().Binary()),
actor->GetAcquiredResources().GetResourceSet());
actor->SetAcquiredResources(ResourceRequest());
}

size_t GcsActorScheduler::GetPendingActorsCount() const {
return cluster_task_manager_->GetInfeasibleQueueSize() +
cluster_task_manager_->GetPendingQueueSize();
return cluster_task_manager_.GetInfeasibleQueueSize() +
cluster_task_manager_.GetPendingQueueSize();
}

bool GcsActorScheduler::CancelInFlightActorScheduling(
const std::shared_ptr<GcsActor> &actor) {
return cluster_task_manager_->CancelTask(
actor->GetCreationTaskSpecification().TaskId());
return cluster_task_manager_.CancelTask(actor->GetCreationTaskSpecification().TaskId());
}

} // namespace gcs
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
instrumented_io_context &io_context,
GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<ClusterTaskManager> cluster_task_manager_,
ClusterTaskManager &cluster_task_manager_,
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::NodeManagerClientPool &raylet_client_pool,
rpc::CoreWorkerClientFactoryFn client_factory = nullptr,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback = nullptr);
virtual ~GcsActorScheduler() = default;
~GcsActorScheduler() override = default;

/// Schedule the specified actor.
/// If there is no available nodes then the actor would be queued in the
Expand Down Expand Up @@ -377,15 +377,15 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// Reference of GcsNodeManager.
const GcsNodeManager &gcs_node_manager_;
/// The cluster task manager.
std::shared_ptr<ClusterTaskManager> cluster_task_manager_;
ClusterTaskManager &cluster_task_manager_;
/// The handler to handle the scheduling failures.
GcsActorSchedulerFailureCallback schedule_failure_handler_;
/// The handler to handle the successful scheduling.
GcsActorSchedulerSuccessCallback schedule_success_handler_;
/// The nodes which are releasing unused workers.
absl::flat_hash_set<NodeID> nodes_of_releasing_unused_workers_;
/// The cached raylet clients used to communicate with raylet.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
rpc::NodeManagerClientPool &raylet_client_pool_;
/// The cached core worker clients which are used to communicate with leased worker.
rpc::CoreWorkerClientPool core_worker_clients_;

Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ GcsAutoscalerStateManager::GcsAutoscalerStateManager(
GcsNodeManager &gcs_node_manager,
GcsActorManager &gcs_actor_manager,
const GcsPlacementGroupManager &gcs_placement_group_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
rpc::NodeManagerClientPool &raylet_client_pool)
: session_name_(session_name),
gcs_node_manager_(gcs_node_manager),
gcs_actor_manager_(gcs_actor_manager),
gcs_placement_group_manager_(gcs_placement_group_manager),
raylet_client_pool_(std::move(raylet_client_pool)),
raylet_client_pool_(raylet_client_pool),
last_cluster_resource_state_version_(0),
last_seen_autoscaler_state_version_(0) {}

Expand Down Expand Up @@ -396,7 +396,7 @@ void GcsAutoscalerStateManager::HandleDrainNode(
raylet_address.set_ip_address(node->node_manager_address());
raylet_address.set_port(node->node_manager_port());

const auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(raylet_address);
const auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(raylet_address);
raylet_client->DrainRaylet(
request.reason(),
request.reason_message(),
Expand Down
13 changes: 6 additions & 7 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ class GcsResourceManager;

class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler {
public:
GcsAutoscalerStateManager(
const std::string &session_name,
GcsNodeManager &gcs_node_manager,
GcsActorManager &gcs_actor_manager,
const GcsPlacementGroupManager &gcs_placement_group_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);
GcsAutoscalerStateManager(const std::string &session_name,
GcsNodeManager &gcs_node_manager,
GcsActorManager &gcs_actor_manager,
const GcsPlacementGroupManager &gcs_placement_group_manager,
rpc::NodeManagerClientPool &raylet_client_pool);

void HandleGetClusterResourceState(
rpc::autoscaler::GetClusterResourceStateRequest request,
Expand Down Expand Up @@ -152,7 +151,7 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
const GcsPlacementGroupManager &gcs_placement_group_manager_;

/// Raylet client pool.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
rpc::NodeManagerClientPool &raylet_client_pool_;

// The default value of the last seen version for the request is 0, which indicates
// no version has been reported. So the first reported version should be 1.
Expand Down
11 changes: 5 additions & 6 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ namespace ray {
namespace gcs {

//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
const ClusterID &cluster_id)
GcsNodeManager::GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id)
: gcs_publisher_(std::move(gcs_publisher)),
gcs_table_storage_(std::move(gcs_table_storage)),
raylet_client_pool_(std::move(raylet_client_pool)),
raylet_client_pool_(raylet_client_pool),
cluster_id_(cluster_id) {}

void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::NodeManagerClientPool *raylet_client_pool,
const ClusterID &cluster_id);

/// Handle register rpc request come from raylet.
Expand Down Expand Up @@ -248,7 +248,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Raylet client pool.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
rpc::NodeManagerClientPool *raylet_client_pool_ = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never nullptr, so use&, not * ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik most style guides suggest pointers over references because of flexibility, moving, copying, it doesn't apply to this, but not bad to be consistent, but can we do absl::NotNull, I don't remember if we need cpp upgrade or absl upgrade for it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for using reference if it's guaranteed to be not null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO our use case here does not require flexibility or so: we won't reassign, copy or move them. I'd say we like the fact that it can't (easiliy) be moved by using reference. The only caution is that we need to make sure the references must outlive the objects holding those refs, which is the case since all these classes share a lifetime with a GcsServer (up until a reset call).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have use cases passing nullptr:

gcs_node_manager =
std::make_unique<GcsNodeManager>(nullptr, nullptr, nullptr, ClusterID::Nil());

/// Cluster ID to be shared with clients when connecting.
const ClusterID cluster_id_;

Expand Down
8 changes: 5 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"

#include <utility>

#include "ray/common/asio/asio_util.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/ray_config.h"
Expand Down Expand Up @@ -181,15 +183,15 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() {

GcsPlacementGroupManager::GcsPlacementGroupManager(
instrumented_io_context &io_context,
std::shared_ptr<GcsPlacementGroupSchedulerInterface> scheduler,
GcsPlacementGroupSchedulerInterface *scheduler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never nullptr, so use&, not * ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we have places passing in nullptr.

std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace)
: io_context_(io_context),
gcs_placement_group_scheduler_(std::move(scheduler)),
gcs_placement_group_scheduler_(scheduler),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_resource_manager_(gcs_resource_manager),
get_ray_namespace_(get_ray_namespace) {
get_ray_namespace_(std::move(get_ray_namespace)) {
placement_group_state_counter_.reset(
new CounterMap<rpc::PlacementGroupTableData::PlacementGroupState>());
placement_group_state_counter_->SetOnChangeCallback(
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param gcs_resource_manager Reference of GcsResourceManager.
/// \param get_ray_namespace A callback to get the ray namespace.
GcsPlacementGroupManager(instrumented_io_context &io_context,
std::shared_ptr<GcsPlacementGroupSchedulerInterface> scheduler,
GcsPlacementGroupSchedulerInterface *scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
GcsResourceManager &gcs_resource_manager,
std::function<std::string(const JobID &)> get_ray_namespace);
Expand Down Expand Up @@ -480,8 +480,8 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
std::deque<std::shared_ptr<GcsPlacementGroup>> infeasible_placement_groups_;

/// The scheduler to schedule all registered placement_groups.
std::shared_ptr<gcs::GcsPlacementGroupSchedulerInterface>
gcs_placement_group_scheduler_;
/// Scheduler's lifecycle lies in [GcsServer].
gcs::GcsPlacementGroupSchedulerInterface *gcs_placement_group_scheduler_ = nullptr;

/// Used to update placement group information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
rpc::NodeManagerClientPool &raylet_client_pool)
: io_context_(io_context),
return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_node_manager_(gcs_node_manager),
cluster_resource_scheduler_(cluster_resource_scheduler),
raylet_client_pool_(std::move(raylet_client_pool)) {}
raylet_client_pool_(raylet_client_pool) {}

void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
const SchedulePgRequest &request) {
Expand Down Expand Up @@ -279,7 +279,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(

std::shared_ptr<ResourceReserveInterface>
GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_address) {
return raylet_client_pool_->GetOrConnectByAddress(raylet_address);
return raylet_client_pool_.GetOrConnectByAddress(raylet_address);
}

std::shared_ptr<ResourceReserveInterface>
Expand Down
13 changes: 6 additions & 7 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,11 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param cluster_resource_scheduler The resource scheduler which is used when
/// scheduling.
/// \param lease_client_factory Factory to create remote lease client.
GcsPlacementGroupScheduler(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);
GcsPlacementGroupScheduler(instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager,
ClusterResourceScheduler &cluster_resource_scheduler,
rpc::NodeManagerClientPool &raylet_client_pool);

virtual ~GcsPlacementGroupScheduler() = default;

Expand Down Expand Up @@ -502,7 +501,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
placement_group_leasing_in_progress_;

/// The cached raylet clients used to communicate with raylets.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
rpc::NodeManagerClientPool &raylet_client_pool_;

/// The nodes which are releasing unused bundles.
absl::flat_hash_set<NodeID> nodes_of_releasing_unused_bundles_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
NodeID local_node_id,
std::shared_ptr<ClusterTaskManager> cluster_task_manager = nullptr);

virtual ~GcsResourceManager() {}
virtual ~GcsResourceManager() = default;

/// Handle the resource update.
void ConsumeSyncMessage(std::shared_ptr<const syncer::RaySyncMessage> message) override;
Expand Down
Loading