From ef274399d7590d738b8759ff88ab02409813e282 Mon Sep 17 00:00:00 2001 From: hjiang Date: Thu, 5 Dec 2024 03:37:01 +0000 Subject: [PATCH 1/3] cleanup gcs server shared pointer Signed-off-by: hjiang --- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 2 +- src/ray/gcs/gcs_server/gcs_resource_manager.cc | 15 +++++++-------- src/ray/gcs/gcs_server/gcs_resource_manager.h | 13 ++++++------- src/ray/gcs/gcs_server/gcs_server.cc | 6 +++--- src/ray/gcs/gcs_server/gcs_server.h | 6 +++--- .../gcs_server/test/gcs_actor_scheduler_test.cc | 2 +- src/ray/raylet/node_manager.cc | 2 +- .../raylet/scheduling/cluster_task_manager.cc | 17 ++++++++--------- .../raylet/scheduling/cluster_task_manager.h | 7 +++---- src/ray/raylet/scheduling/scheduler_stats.cc | 2 +- 10 files changed, 34 insertions(+), 38 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 5daa1a9922578..a159966f7c83f 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -670,7 +670,7 @@ void GcsActorScheduler::OnActorDestruction(std::shared_ptr actor) { void GcsActorScheduler::ReturnActorAcquiredResources(std::shared_ptr actor) { auto &cluster_resource_manager = - cluster_task_manager_.GetClusterResourceScheduler()->GetClusterResourceManager(); + cluster_task_manager_.GetClusterResourceScheduler().GetClusterResourceManager(); cluster_resource_manager.AddNodeAvailableResources( scheduling::NodeID(actor->GetNodeID().Binary()), actor->GetAcquiredResources().GetResourceSet()); diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 7a6f2ba8ffcdf..d5fccc8e622f5 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -20,17 +20,16 @@ namespace ray { namespace gcs { -GcsResourceManager::GcsResourceManager( - instrumented_io_context &io_context, - ClusterResourceManager &cluster_resource_manager, - GcsNodeManager &gcs_node_manager, - NodeID local_node_id, - std::shared_ptr cluster_task_manager) +GcsResourceManager::GcsResourceManager(instrumented_io_context &io_context, + ClusterResourceManager &cluster_resource_manager, + GcsNodeManager &gcs_node_manager, + NodeID local_node_id, + ClusterTaskManager *cluster_task_manager) : io_context_(io_context), cluster_resource_manager_(cluster_resource_manager), gcs_node_manager_(gcs_node_manager), local_node_id_(std::move(local_node_id)), - cluster_task_manager_(std::move(cluster_task_manager)) {} + cluster_task_manager_(cluster_task_manager) {} void GcsResourceManager::ConsumeSyncMessage( std::shared_ptr message) { @@ -197,7 +196,7 @@ void GcsResourceManager::HandleGetAllResourceUsage( batch.add_batch()->CopyFrom(usage.second); } - if (cluster_task_manager_) { + if (cluster_task_manager_ != nullptr) { // Fill the gcs info when gcs actor scheduler is enabled. rpc::ResourcesData gcs_resources_data; cluster_task_manager_->FillPendingActorInfo(gcs_resources_data); diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index 47ecf9fff3a37..2cc42c37ee5ba 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -60,12 +60,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, public syncer::ReceiverInterface { public: /// Create a GcsResourceManager. - explicit GcsResourceManager( - instrumented_io_context &io_context, - ClusterResourceManager &cluster_resource_manager, - GcsNodeManager &gcs_node_manager, - NodeID local_node_id, - std::shared_ptr cluster_task_manager = nullptr); + explicit GcsResourceManager(instrumented_io_context &io_context, + ClusterResourceManager &cluster_resource_manager, + GcsNodeManager &gcs_node_manager, + NodeID local_node_id, + ClusterTaskManager *cluster_task_manager = nullptr); virtual ~GcsResourceManager() = default; @@ -199,7 +198,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, ClusterResourceManager &cluster_resource_manager_; GcsNodeManager &gcs_node_manager_; NodeID local_node_id_; - std::shared_ptr cluster_task_manager_; + ClusterTaskManager *cluster_task_manager_; /// Num of alive nodes in the cluster. size_t num_alive_nodes_ = 0; }; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 7bf977fcdb8a2..977c045cbd3f1 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -329,7 +329,7 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { cluster_resource_scheduler_->GetClusterResourceManager(), *gcs_node_manager_, kGCSNodeID, - cluster_task_manager_); + cluster_task_manager_.get()); // Initialize by gcs tables data. gcs_resource_manager_->Initialize(gcs_init_data); @@ -396,9 +396,9 @@ void GcsServer::InitClusterResourceScheduler() { void GcsServer::InitClusterTaskManager() { RAY_CHECK(cluster_resource_scheduler_); - cluster_task_manager_ = std::make_shared( + cluster_task_manager_ = std::make_unique( kGCSNodeID, - cluster_resource_scheduler_, + *cluster_resource_scheduler_, /*get_node_info=*/ [this](const NodeID &node_id) { auto node = gcs_node_manager_->GetAliveNode(node_id); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 5ea0e503d7966..a43476c1638a4 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -226,12 +226,12 @@ class GcsServer { rpc::ClientCallManager client_call_manager_; /// Node manager client pool. std::unique_ptr raylet_client_pool_; - /// The gcs resource manager. - std::unique_ptr gcs_resource_manager_; /// The cluster resource scheduler. std::shared_ptr cluster_resource_scheduler_; /// The cluster task manager. - std::shared_ptr cluster_task_manager_; + std::unique_ptr cluster_task_manager_; + /// The gcs resource manager. + std::unique_ptr gcs_resource_manager_; /// The autoscaler state manager. std::unique_ptr gcs_autoscaler_state_manager_; /// The gcs node manager. diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 21ad1aa8c144b..cfd421331a8c5 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -696,7 +696,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleAndDestroyOneActor) { scheduling::NodeID scheduling_node_id(node->node_id()); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); const auto &cluster_resource_manager = - cluster_task_manager_->GetClusterResourceScheduler()->GetClusterResourceManager(); + cluster_task_manager_->GetClusterResourceScheduler().GetClusterResourceManager(); auto resource_view_before_scheduling = cluster_resource_manager.GetResourceView(); ASSERT_TRUE(resource_view_before_scheduling.contains(scheduling_node_id)); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e494b2c6887a8..7dba65624577c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -359,7 +359,7 @@ NodeManager::NodeManager( max_task_args_memory); cluster_task_manager_ = std::make_shared( self_node_id_, - std::dynamic_pointer_cast(cluster_resource_scheduler_), + *std::dynamic_pointer_cast(cluster_resource_scheduler_), get_node_info_func, announce_infeasible_task, *local_task_manager_); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index a7123eeb80a04..b240e7cc4046a 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -26,7 +26,7 @@ namespace raylet { ClusterTaskManager::ClusterTaskManager( const NodeID &self_node_id, - std::shared_ptr cluster_resource_scheduler, + ClusterResourceScheduler &cluster_resource_scheduler, internal::NodeInfoGetter get_node_info, std::function announce_infeasible_task, ILocalTaskManager &local_task_manager, @@ -156,7 +156,7 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() { RayTask task = work->task; RAY_LOG(DEBUG) << "Scheduling pending task " << task.GetTaskSpecification().TaskId(); - auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( + auto scheduling_node_id = cluster_resource_scheduler_.GetBestSchedulableNode( task.GetTaskSpecification(), /*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary() : task.GetPreferredNodeID(), @@ -175,7 +175,7 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() { !task.GetTaskSpecification().GetNodeAffinitySchedulingStrategySoft()) { // This can only happen if the target node doesn't exist or is infeasible. // The task will never be schedulable in either case so we should fail it. - if (cluster_resource_scheduler_->IsLocalNodeWithRaylet()) { + if (cluster_resource_scheduler_.IsLocalNodeWithRaylet()) { ReplyCancelled( *work, rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE, @@ -251,7 +251,7 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() { RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:" << task.GetTaskSpecification().TaskId(); bool is_infeasible; - cluster_resource_scheduler_->GetBestSchedulableNode( + cluster_resource_scheduler_.GetBestSchedulableNode( task.GetTaskSpecification(), /*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary() : task.GetPreferredNodeID(), @@ -291,7 +291,7 @@ void ClusterTaskManager::FillResourceUsage(rpc::ResourcesData &data) { scheduler_resource_reporter_.FillResourceUsage(data); // This populates usage information. syncer::ResourceViewSyncMessage resource_view_sync_message; - cluster_resource_scheduler_->GetLocalResourceManager().PopulateResourceViewSyncMessage( + cluster_resource_scheduler_.GetLocalResourceManager().PopulateResourceViewSyncMessage( resource_view_sync_message); (*data.mutable_resources_total()) = std::move(resource_view_sync_message.resources_total()); @@ -357,7 +357,7 @@ bool ClusterTaskManager::AnyPendingTasksForResourceAcquisition( void ClusterTaskManager::RecordMetrics() const { internal_stats_.RecordMetrics(); - cluster_resource_scheduler_->GetLocalResourceManager().RecordMetrics(); + cluster_resource_scheduler_.GetLocalResourceManager().RecordMetrics(); } std::string ClusterTaskManager::DebugStr() const { @@ -385,7 +385,7 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to, const auto &task_spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; - if (!cluster_resource_scheduler_->AllocateRemoteTaskResources( + if (!cluster_resource_scheduler_.AllocateRemoteTaskResources( scheduling::NodeID(spillback_to.Binary()), task_spec.GetRequiredResources().GetResourceMap())) { RAY_LOG(DEBUG) << "Tried to allocate resources for request " << task_spec.TaskId() @@ -405,8 +405,7 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to, send_reply_callback(); } -std::shared_ptr -ClusterTaskManager::GetClusterResourceScheduler() const { +ClusterResourceScheduler &ClusterTaskManager::GetClusterResourceScheduler() const { return cluster_resource_scheduler_; } diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 588536cbf654e..2b0886663bd5c 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -51,7 +51,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// \param get_time_ms: A callback which returns the current time in milliseconds. ClusterTaskManager( const NodeID &self_node_id, - std::shared_ptr cluster_resource_scheduler, + ClusterResourceScheduler &cluster_resource_scheduler, internal::NodeInfoGetter get_node_info, std::function announce_infeasible_task, ILocalTaskManager &local_task_manager, @@ -132,7 +132,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// The helper to dump the debug state of the cluster task manater. std::string DebugStr() const override; - std::shared_ptr GetClusterResourceScheduler() const; + ClusterResourceScheduler &GetClusterResourceScheduler() const; /// Get the count of tasks in `infeasible_tasks_`. size_t GetInfeasibleQueueSize() const; @@ -161,8 +161,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { const NodeID &self_node_id_; /// Responsible for resource tracking/view of the cluster. - /// TODO(hjiang): Use reference instead of shared pointer. - std::shared_ptr cluster_resource_scheduler_; + ClusterResourceScheduler &cluster_resource_scheduler_; /// Function to get the node information of a given node id. internal::NodeInfoGetter get_node_info_; diff --git a/src/ray/raylet/scheduling/scheduler_stats.cc b/src/ray/raylet/scheduling/scheduler_stats.cc index 467f1c6bfc585..8a338e23a5b7e 100644 --- a/src/ray/raylet/scheduling/scheduler_stats.cc +++ b/src/ray/raylet/scheduling/scheduler_stats.cc @@ -173,7 +173,7 @@ std::string SchedulerStats::ComputeAndReportDebugStr() { buffer << "num_tasks_waiting_for_workers: " << num_tasks_waiting_for_workers_ << "\n"; buffer << "num_cancelled_tasks: " << num_cancelled_tasks_ << "\n"; buffer << "cluster_resource_scheduler state: " - << cluster_task_manager_.cluster_resource_scheduler_->DebugString() << "\n"; + << cluster_task_manager_.cluster_resource_scheduler_.DebugString() << "\n"; local_task_manager_.DebugStr(buffer); buffer << "==================================================\n"; From aa22b434547145b2d821f59b131bea7af21c579e Mon Sep 17 00:00:00 2001 From: hjiang Date: Thu, 5 Dec 2024 06:49:30 +0000 Subject: [PATCH 2/3] put dependency into comment Signed-off-by: hjiang --- src/ray/gcs/gcs_server/gcs_server.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index a43476c1638a4..5c9e87b6c03d0 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -228,8 +228,10 @@ class GcsServer { std::unique_ptr raylet_client_pool_; /// The cluster resource scheduler. std::shared_ptr cluster_resource_scheduler_; + /// [cluster_task_manager_] depends on [cluster_resource_scheduler_]. /// The cluster task manager. std::unique_ptr cluster_task_manager_; + /// [gcs_resource_manager_] depends on [cluster_task_manager_]. /// The gcs resource manager. std::unique_ptr gcs_resource_manager_; /// The autoscaler state manager. From 79d472a9d2623fee1f46671dd449fc52bd644e78 Mon Sep 17 00:00:00 2001 From: hjiang Date: Thu, 5 Dec 2024 21:26:26 +0000 Subject: [PATCH 3/3] fix unit test Signed-off-by: hjiang --- .../gcs_server/test/gcs_actor_scheduler_mock_test.cc | 2 +- .../gcs/gcs_server/test/gcs_actor_scheduler_test.cc | 10 ++++++---- src/ray/raylet/scheduling/cluster_task_manager_test.cc | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc index 5b01a5b0d7f98..20e441e7f5360 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc @@ -55,7 +55,7 @@ class GcsActorSchedulerMockTest : public Test { local_task_manager_ = std::make_unique(); cluster_task_manager = std::make_unique( local_node_id, - cluster_resource_scheduler, + *cluster_resource_scheduler, /*get_node_info=*/ [this](const NodeID &node_id) { auto node = gcs_node_manager->GetAliveNode(node_id); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index cfd421331a8c5..1e696d5f9c907 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -46,7 +46,7 @@ class GcsActorSchedulerTest : public ::testing::Test { gcs_actor_table_ = std::make_shared(store_client_); local_node_id_ = NodeID::FromRandom(); - auto cluster_resource_scheduler = std::make_shared( + cluster_resource_scheduler_ = std::make_unique( io_service_, scheduling::NodeID(local_node_id_.Binary()), NodeResources(), @@ -58,7 +58,7 @@ class GcsActorSchedulerTest : public ::testing::Test { local_task_manager_ = std::make_unique(); cluster_task_manager_ = std::make_unique( local_node_id_, - cluster_resource_scheduler, + *cluster_resource_scheduler_, /*get_node_info=*/ [this](const NodeID &node_id) { auto node = gcs_node_manager_->GetAliveNode(node_id); @@ -68,7 +68,7 @@ class GcsActorSchedulerTest : public ::testing::Test { /*local_task_manager=*/*local_task_manager_); auto gcs_resource_manager = std::make_shared( io_service_, - cluster_resource_scheduler->GetClusterResourceManager(), + cluster_resource_scheduler_->GetClusterResourceManager(), *gcs_node_manager_, local_node_id_); gcs_actor_scheduler_ = std::make_shared( @@ -96,7 +96,8 @@ class GcsActorSchedulerTest : public ::testing::Test { }); gcs_node_manager_->AddNodeAddedListener( - [cluster_resource_scheduler](std::shared_ptr node) { + [cluster_resource_scheduler = + cluster_resource_scheduler_.get()](std::shared_ptr node) { scheduling::NodeID node_id(node->node_id()); auto &cluster_resource_manager = cluster_resource_scheduler->GetClusterResourceManager(); @@ -150,6 +151,7 @@ class GcsActorSchedulerTest : public ::testing::Test { std::shared_ptr worker_client_; std::shared_ptr gcs_node_manager_; std::unique_ptr local_task_manager_; + std::unique_ptr cluster_resource_scheduler_; std::shared_ptr cluster_task_manager_; std::shared_ptr gcs_actor_scheduler_; std::shared_ptr>> diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 70385421d19f8..216c2a083101c 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -279,7 +279,7 @@ class ClusterTaskManagerTest : public ::testing::Test { /*get_time=*/[this]() { return current_time_ms_; })), task_manager_( id_, - scheduler_, + *scheduler_, /* get_node_info= */ [this](const NodeID &node_id) -> const rpc::GcsNodeInfo * { node_info_calls_++;