diff --git a/BUILD.bazel b/BUILD.bazel index 03c002fc1256..49efc0b87835 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2403,11 +2403,43 @@ ray_cc_test( ) ray_cc_test( - name = "gcs_export_event_test", + name = "gcs_job_manager_export_event_test", size = "small", - srcs = glob([ - "src/ray/gcs/gcs_server/test/export_api/*.cc", - ]), + srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc"], + tags = [ + "no_windows", + "team:core" + ], + deps = [ + ":gcs_server_lib", + ":gcs_server_test_util", + ":gcs_test_util_lib", + ":ray_mock", + "@com_google_googletest//:gtest_main", + ], +) + +ray_cc_test( + name = "gcs_actor_manager_export_event_test", + size = "small", + srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc"], + tags = [ + "no_windows", + "team:core" + ], + deps = [ + ":gcs_server_lib", + ":gcs_server_test_util", + ":gcs_test_util_lib", + ":ray_mock", + "@com_google_googletest//:gtest_main", + ], +) + +ray_cc_test( + name = "gcs_node_manager_export_event_test", + size = "small", + srcs = ["src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc"], tags = [ "no_windows", "team:core" diff --git a/src/mock/ray/gcs/gcs_server/gcs_node_manager.h b/src/mock/ray/gcs/gcs_server/gcs_node_manager.h index 3a8f22949fae..7a3efe197529 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/mock/ray/gcs/gcs_server/gcs_node_manager.h @@ -18,7 +18,11 @@ namespace gcs { class MockGcsNodeManager : public GcsNodeManager { public: - MockGcsNodeManager() : GcsNodeManager(nullptr, nullptr, nullptr, ClusterID::Nil()) {} + MockGcsNodeManager() + : GcsNodeManager(/*gcs_publisher=*/nullptr, + /*gcs_table_storage=*/nullptr, + /*raylet_client_pool=*/nullptr, + /*cluster_id=*/ClusterID::Nil()) {} MOCK_METHOD(void, HandleRegisterNode, (rpc::RegisterNodeRequest request, diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index dadb90498379..afdf55be80d6 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -321,7 +321,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { std::function destroy_owned_placement_group_if_needed, const rpc::CoreWorkerClientFactoryFn &worker_client_factory = nullptr); - ~GcsActorManager() = default; + ~GcsActorManager() override = default; void HandleRegisterActor(rpc::RegisterActorRequest request, rpc::RegisterActorReply *reply, diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index f562c0f9034e..5daa1a992257 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -27,17 +27,17 @@ GcsActorScheduler::GcsActorScheduler( instrumented_io_context &io_context, GcsActorTable &gcs_actor_table, const GcsNodeManager &gcs_node_manager, - std::shared_ptr cluster_task_manager, + ClusterTaskManager &cluster_task_manager, GcsActorSchedulerFailureCallback schedule_failure_handler, GcsActorSchedulerSuccessCallback schedule_success_handler, - std::shared_ptr raylet_client_pool, + rpc::NodeManagerClientPool &raylet_client_pool, rpc::CoreWorkerClientFactoryFn client_factory, std::function normal_task_resources_changed_callback) : io_context_(io_context), gcs_actor_table_(gcs_actor_table), gcs_node_manager_(gcs_node_manager), - cluster_task_manager_(std::move(cluster_task_manager)), + cluster_task_manager_(cluster_task_manager), schedule_failure_handler_(std::move(schedule_failure_handler)), schedule_success_handler_(std::move(schedule_success_handler)), raylet_client_pool_(raylet_client_pool), @@ -97,11 +97,11 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr actor) { const auto &owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID()); RayTask task(actor->GetCreationTaskSpecification(), owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string()); - cluster_task_manager_->QueueAndScheduleTask(task, - /*grant_or_reject*/ false, - /*is_selected_based_on_locality*/ false, - /*reply*/ reply.get(), - send_reply_callback); + cluster_task_manager_.QueueAndScheduleTask(task, + /*grant_or_reject*/ false, + /*is_selected_based_on_locality*/ false, + /*reply*/ reply.get(), + send_reply_callback); } void GcsActorScheduler::ScheduleByRaylet(std::shared_ptr actor) { @@ -218,7 +218,7 @@ std::vector GcsActorScheduler::CancelOnNode(const NodeID &node_id) { } } - raylet_client_pool_->Disconnect(node_id); + raylet_client_pool_.Disconnect(node_id); return actor_ids; } @@ -531,7 +531,7 @@ void GcsActorScheduler::DoRetryCreatingActorOnWorker( std::shared_ptr GcsActorScheduler::GetOrConnectLeaseClient( const rpc::Address &raylet_address) { - return raylet_client_pool_->GetOrConnectByAddress(raylet_address); + return raylet_client_pool_.GetOrConnectByAddress(raylet_address); } bool GcsActorScheduler::KillActorOnWorker(const rpc::Address &worker_address, @@ -664,13 +664,13 @@ void GcsActorScheduler::HandleWorkerLeaseRejectedReply( void GcsActorScheduler::OnActorDestruction(std::shared_ptr actor) { if (!actor->GetAcquiredResources().IsEmpty()) { ReturnActorAcquiredResources(actor); - cluster_task_manager_->ScheduleAndDispatchTasks(); + cluster_task_manager_.ScheduleAndDispatchTasks(); } } void GcsActorScheduler::ReturnActorAcquiredResources(std::shared_ptr actor) { auto &cluster_resource_manager = - cluster_task_manager_->GetClusterResourceScheduler()->GetClusterResourceManager(); + cluster_task_manager_.GetClusterResourceScheduler()->GetClusterResourceManager(); cluster_resource_manager.AddNodeAvailableResources( scheduling::NodeID(actor->GetNodeID().Binary()), actor->GetAcquiredResources().GetResourceSet()); @@ -678,14 +678,13 @@ void GcsActorScheduler::ReturnActorAcquiredResources(std::shared_ptr a } size_t GcsActorScheduler::GetPendingActorsCount() const { - return cluster_task_manager_->GetInfeasibleQueueSize() + - cluster_task_manager_->GetPendingQueueSize(); + return cluster_task_manager_.GetInfeasibleQueueSize() + + cluster_task_manager_.GetPendingQueueSize(); } bool GcsActorScheduler::CancelInFlightActorScheduling( const std::shared_ptr &actor) { - return cluster_task_manager_->CancelTask( - actor->GetCreationTaskSpecification().TaskId()); + return cluster_task_manager_.CancelTask(actor->GetCreationTaskSpecification().TaskId()); } } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index 1ea66d0ddbe0..048d1da8939c 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -129,14 +129,14 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { instrumented_io_context &io_context, GcsActorTable &gcs_actor_table, const GcsNodeManager &gcs_node_manager, - std::shared_ptr cluster_task_manager_, + ClusterTaskManager &cluster_task_manager_, GcsActorSchedulerFailureCallback schedule_failure_handler, GcsActorSchedulerSuccessCallback schedule_success_handler, - std::shared_ptr raylet_client_pool, + rpc::NodeManagerClientPool &raylet_client_pool, rpc::CoreWorkerClientFactoryFn client_factory = nullptr, std::function normal_task_resources_changed_callback = nullptr); - virtual ~GcsActorScheduler() = default; + ~GcsActorScheduler() override = default; /// Schedule the specified actor. /// If there is no available nodes then the actor would be queued in the @@ -377,7 +377,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// Reference of GcsNodeManager. const GcsNodeManager &gcs_node_manager_; /// The cluster task manager. - std::shared_ptr cluster_task_manager_; + ClusterTaskManager &cluster_task_manager_; /// The handler to handle the scheduling failures. GcsActorSchedulerFailureCallback schedule_failure_handler_; /// The handler to handle the successful scheduling. @@ -385,7 +385,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// The nodes which are releasing unused workers. absl::flat_hash_set nodes_of_releasing_unused_workers_; /// The cached raylet clients used to communicate with raylet. - std::shared_ptr raylet_client_pool_; + rpc::NodeManagerClientPool &raylet_client_pool_; /// The cached core worker clients which are used to communicate with leased worker. rpc::CoreWorkerClientPool core_worker_clients_; diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc index e4ef13371b53..c166dbbf6398 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc @@ -28,12 +28,12 @@ GcsAutoscalerStateManager::GcsAutoscalerStateManager( GcsNodeManager &gcs_node_manager, GcsActorManager &gcs_actor_manager, const GcsPlacementGroupManager &gcs_placement_group_manager, - std::shared_ptr raylet_client_pool) + rpc::NodeManagerClientPool &raylet_client_pool) : session_name_(session_name), gcs_node_manager_(gcs_node_manager), gcs_actor_manager_(gcs_actor_manager), gcs_placement_group_manager_(gcs_placement_group_manager), - raylet_client_pool_(std::move(raylet_client_pool)), + raylet_client_pool_(raylet_client_pool), last_cluster_resource_state_version_(0), last_seen_autoscaler_state_version_(0) {} @@ -396,7 +396,7 @@ void GcsAutoscalerStateManager::HandleDrainNode( raylet_address.set_ip_address(node->node_manager_address()); raylet_address.set_port(node->node_manager_port()); - const auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(raylet_address); + const auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(raylet_address); raylet_client->DrainRaylet( request.reason(), request.reason_message(), diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h index c00d8d465202..c592a7a484d6 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h @@ -29,12 +29,11 @@ class GcsResourceManager; class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler { public: - GcsAutoscalerStateManager( - const std::string &session_name, - GcsNodeManager &gcs_node_manager, - GcsActorManager &gcs_actor_manager, - const GcsPlacementGroupManager &gcs_placement_group_manager, - std::shared_ptr raylet_client_pool); + GcsAutoscalerStateManager(const std::string &session_name, + GcsNodeManager &gcs_node_manager, + GcsActorManager &gcs_actor_manager, + const GcsPlacementGroupManager &gcs_placement_group_manager, + rpc::NodeManagerClientPool &raylet_client_pool); void HandleGetClusterResourceState( rpc::autoscaler::GetClusterResourceStateRequest request, @@ -152,7 +151,7 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler const GcsPlacementGroupManager &gcs_placement_group_manager_; /// Raylet client pool. - std::shared_ptr raylet_client_pool_; + rpc::NodeManagerClientPool &raylet_client_pool_; // The default value of the last seen version for the request is 0, which indicates // no version has been reported. So the first reported version should be 1. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index a232ecf10903..c57e8ea2fd08 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -29,14 +29,13 @@ namespace ray { namespace gcs { ////////////////////////////////////////////////////////////////////////////////////////// -GcsNodeManager::GcsNodeManager( - std::shared_ptr gcs_publisher, - std::shared_ptr gcs_table_storage, - std::shared_ptr raylet_client_pool, - const ClusterID &cluster_id) +GcsNodeManager::GcsNodeManager(std::shared_ptr gcs_publisher, + std::shared_ptr gcs_table_storage, + rpc::NodeManagerClientPool *raylet_client_pool, + const ClusterID &cluster_id) : gcs_publisher_(std::move(gcs_publisher)), gcs_table_storage_(std::move(gcs_table_storage)), - raylet_client_pool_(std::move(raylet_client_pool)), + raylet_client_pool_(raylet_client_pool), cluster_id_(cluster_id) {} void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const { diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index db258d4cb00c..b924fec264c9 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -50,7 +50,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \param gcs_table_storage GCS table external storage accessor. explicit GcsNodeManager(std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, - std::shared_ptr raylet_client_pool, + rpc::NodeManagerClientPool *raylet_client_pool, const ClusterID &cluster_id); /// Handle register rpc request come from raylet. @@ -248,7 +248,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Storage for GCS tables. std::shared_ptr gcs_table_storage_; /// Raylet client pool. - std::shared_ptr raylet_client_pool_; + rpc::NodeManagerClientPool *raylet_client_pool_ = nullptr; /// Cluster ID to be shared with clients when connecting. const ClusterID cluster_id_; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index c60bcd43cc45..1aec60e9603c 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -14,6 +14,8 @@ #include "ray/gcs/gcs_server/gcs_placement_group_manager.h" +#include + #include "ray/common/asio/asio_util.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/ray_config.h" @@ -181,15 +183,15 @@ rpc::PlacementGroupStats *GcsPlacementGroup::GetMutableStats() { GcsPlacementGroupManager::GcsPlacementGroupManager( instrumented_io_context &io_context, - std::shared_ptr scheduler, + GcsPlacementGroupSchedulerInterface *scheduler, std::shared_ptr gcs_table_storage, GcsResourceManager &gcs_resource_manager, std::function get_ray_namespace) : io_context_(io_context), - gcs_placement_group_scheduler_(std::move(scheduler)), + gcs_placement_group_scheduler_(scheduler), gcs_table_storage_(std::move(gcs_table_storage)), gcs_resource_manager_(gcs_resource_manager), - get_ray_namespace_(get_ray_namespace) { + get_ray_namespace_(std::move(get_ray_namespace)) { placement_group_state_counter_.reset( new CounterMap()); placement_group_state_counter_->SetOnChangeCallback( diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index d90fdccf3a8a..a7d91388e264 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -237,7 +237,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param gcs_resource_manager Reference of GcsResourceManager. /// \param get_ray_namespace A callback to get the ray namespace. GcsPlacementGroupManager(instrumented_io_context &io_context, - std::shared_ptr scheduler, + GcsPlacementGroupSchedulerInterface *scheduler, std::shared_ptr gcs_table_storage, GcsResourceManager &gcs_resource_manager, std::function get_ray_namespace); @@ -480,8 +480,8 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { std::deque> infeasible_placement_groups_; /// The scheduler to schedule all registered placement_groups. - std::shared_ptr - gcs_placement_group_scheduler_; + /// Scheduler's lifecycle lies in [GcsServer]. + gcs::GcsPlacementGroupSchedulerInterface *gcs_placement_group_scheduler_ = nullptr; /// Used to update placement group information upon creation, deletion, etc. std::shared_ptr gcs_table_storage_; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 6bc2737c14a6..85a94f863598 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -26,13 +26,13 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( std::shared_ptr gcs_table_storage, const gcs::GcsNodeManager &gcs_node_manager, ClusterResourceScheduler &cluster_resource_scheduler, - std::shared_ptr raylet_client_pool) + rpc::NodeManagerClientPool &raylet_client_pool) : io_context_(io_context), return_timer_(io_context), gcs_table_storage_(std::move(gcs_table_storage)), gcs_node_manager_(gcs_node_manager), cluster_resource_scheduler_(cluster_resource_scheduler), - raylet_client_pool_(std::move(raylet_client_pool)) {} + raylet_client_pool_(raylet_client_pool) {} void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( const SchedulePgRequest &request) { @@ -279,7 +279,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( std::shared_ptr GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_address) { - return raylet_client_pool_->GetOrConnectByAddress(raylet_address); + return raylet_client_pool_.GetOrConnectByAddress(raylet_address); } std::shared_ptr diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index ec7ac53941bd..df16f025d082 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -290,12 +290,11 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param cluster_resource_scheduler The resource scheduler which is used when /// scheduling. /// \param lease_client_factory Factory to create remote lease client. - GcsPlacementGroupScheduler( - instrumented_io_context &io_context, - std::shared_ptr gcs_table_storage, - const GcsNodeManager &gcs_node_manager, - ClusterResourceScheduler &cluster_resource_scheduler, - std::shared_ptr raylet_client_pool); + GcsPlacementGroupScheduler(instrumented_io_context &io_context, + std::shared_ptr gcs_table_storage, + const GcsNodeManager &gcs_node_manager, + ClusterResourceScheduler &cluster_resource_scheduler, + rpc::NodeManagerClientPool &raylet_client_pool); virtual ~GcsPlacementGroupScheduler() = default; @@ -502,7 +501,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { placement_group_leasing_in_progress_; /// The cached raylet clients used to communicate with raylets. - std::shared_ptr raylet_client_pool_; + rpc::NodeManagerClientPool &raylet_client_pool_; /// The nodes which are releasing unused bundles. absl::flat_hash_set nodes_of_releasing_unused_bundles_; diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index dff95380cd21..47ecf9fff3a3 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -67,7 +67,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, NodeID local_node_id, std::shared_ptr cluster_task_manager = nullptr); - virtual ~GcsResourceManager() {} + virtual ~GcsResourceManager() = default; /// Handle the resource update. void ConsumeSyncMessage(std::shared_ptr message) override; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index c51c14bbb0dd..a8ad05ff3c9d 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -64,7 +64,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, ClusterID::Nil(), RayConfig::instance().gcs_server_rpc_client_thread_num()), raylet_client_pool_( - std::make_shared(client_call_manager_)), + std::make_unique(client_call_manager_)), pubsub_periodical_runner_(io_context_provider_.GetIOContext()), periodical_runner_(io_context_provider_.GetDefaultIOContext()), is_started_(false), @@ -289,7 +289,7 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_publisher_); gcs_node_manager_ = std::make_unique(gcs_publisher_, gcs_table_storage_, - raylet_client_pool_, + raylet_client_pool_.get(), rpc_server_.GetClusterId()); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); @@ -323,7 +323,7 @@ void GcsServer::InitGcsHealthCheckManager(const GcsInitData &gcs_init_data) { void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { RAY_CHECK(cluster_resource_scheduler_ && cluster_task_manager_); - gcs_resource_manager_ = std::make_shared( + gcs_resource_manager_ = std::make_unique( io_context_provider_.GetDefaultIOContext(), cluster_resource_scheduler_->GetClusterResourceManager(), *gcs_node_manager_, @@ -446,25 +446,25 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { const rpc::PushTaskReply &reply) { gcs_actor_manager_->OnActorCreationSuccess(std::move(actor), reply); }; - auto client_factory = [this](const rpc::Address &address) { - return std::make_shared(address, client_call_manager_); - }; RAY_CHECK(gcs_resource_manager_ && cluster_task_manager_); scheduler = std::make_unique( io_context_provider_.GetDefaultIOContext(), gcs_table_storage_->ActorTable(), *gcs_node_manager_, - cluster_task_manager_, + *cluster_task_manager_, schedule_failure_handler, schedule_success_handler, - raylet_client_pool_, - client_factory, + *raylet_client_pool_, + /*factory=*/ + [this](const rpc::Address &address) { + return std::make_shared(address, client_call_manager_); + }, /*normal_task_resources_changed_callback=*/ [this](const NodeID &node_id, const rpc::ResourcesData &resources) { gcs_resource_manager_->UpdateNodeNormalTaskResources(node_id, resources); }); - gcs_actor_manager_ = std::make_shared( + gcs_actor_manager_ = std::make_unique( std::move(scheduler), gcs_table_storage_, gcs_publisher_, @@ -480,23 +480,23 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { // Initialize by gcs tables data. gcs_actor_manager_->Initialize(gcs_init_data); // Register service. - actor_info_service_.reset(new rpc::ActorInfoGrpcService( - io_context_provider_.GetDefaultIOContext(), *gcs_actor_manager_)); + actor_info_service_ = std::make_unique( + io_context_provider_.GetDefaultIOContext(), *gcs_actor_manager_); rpc_server_.RegisterService(*actor_info_service_); } void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_node_manager_); - gcs_placement_group_scheduler_ = std::make_shared( + gcs_placement_group_scheduler_ = std::make_unique( io_context_provider_.GetDefaultIOContext(), gcs_table_storage_, *gcs_node_manager_, *cluster_resource_scheduler_, - raylet_client_pool_); + *raylet_client_pool_); - gcs_placement_group_manager_ = std::make_shared( + gcs_placement_group_manager_ = std::make_unique( io_context_provider_.GetDefaultIOContext(), - gcs_placement_group_scheduler_, + gcs_placement_group_scheduler_.get(), gcs_table_storage_, *gcs_resource_manager_, [this](const JobID &job_id) { @@ -671,7 +671,7 @@ void GcsServer::InitGcsAutoscalerStateManager(const GcsInitData &gcs_init_data) *gcs_node_manager_, *gcs_actor_manager_, *gcs_placement_group_manager_, - raylet_client_pool_); + *raylet_client_pool_); gcs_autoscaler_state_manager_->Initialize(gcs_init_data); autoscaler_state_service_.reset(new rpc::autoscaler::AutoscalerStateGrpcService( @@ -826,7 +826,7 @@ std::shared_ptr GcsServer::GetOrConnectRedis() { RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status; // Init redis failure detector. - gcs_redis_failure_detector_ = std::make_shared( + gcs_redis_failure_detector_ = std::make_unique( io_context_provider_.GetDefaultIOContext(), redis_client_, []() { RAY_LOG(FATAL) << "Redis connection failed. Shutdown GCS."; }); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 22ece4dda229..6c37e9d8210c 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "ray/common/asio/asio_util.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/ray_syncer/ray_syncer.h" @@ -218,9 +220,9 @@ class GcsServer { /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. rpc::ClientCallManager client_call_manager_; /// Node manager client pool. - std::shared_ptr raylet_client_pool_; + std::unique_ptr raylet_client_pool_; /// The gcs resource manager. - std::shared_ptr gcs_resource_manager_; + std::unique_ptr gcs_resource_manager_; /// The cluster resource scheduler. std::shared_ptr cluster_resource_scheduler_; /// The cluster task manager. @@ -230,15 +232,17 @@ class GcsServer { /// The gcs node manager. std::unique_ptr gcs_node_manager_; /// The health check manager. - std::shared_ptr gcs_healthcheck_manager_; + std::unique_ptr gcs_healthcheck_manager_; /// The gcs redis failure detector. - std::shared_ptr gcs_redis_failure_detector_; + std::unique_ptr gcs_redis_failure_detector_; /// The gcs actor manager. - std::shared_ptr gcs_actor_manager_; + std::unique_ptr gcs_actor_manager_; /// The gcs placement group scheduler. - std::shared_ptr gcs_placement_group_scheduler_; + /// [gcs_placement_group_scheduler_] depends on [raylet_client_pool_]. + std::unique_ptr gcs_placement_group_scheduler_; /// The gcs placement group manager. - std::shared_ptr gcs_placement_group_manager_; + /// [gcs_placement_group_manager_] depends on [gcs_placement_group_scheduler_]. + std::unique_ptr gcs_placement_group_manager_; /// Job info handler and service. std::unique_ptr gcs_job_manager_; std::unique_ptr job_info_service_; diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc index 742716da4b45..61d2d0e8b932 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc @@ -41,7 +41,7 @@ class GcsNodeManagerExportAPITest : public ::testing::Test { public: GcsNodeManagerExportAPITest() { raylet_client_ = std::make_shared(); - client_pool_ = std::make_shared( + client_pool_ = std::make_unique( [this](const rpc::Address &) { return raylet_client_; }); gcs_publisher_ = std::make_shared( std::make_unique()); @@ -72,7 +72,7 @@ class GcsNodeManagerExportAPITest : public ::testing::Test { protected: std::shared_ptr gcs_table_storage_; std::shared_ptr raylet_client_; - std::shared_ptr client_pool_; + std::unique_ptr client_pool_; std::shared_ptr gcs_publisher_; instrumented_io_context io_service_; std::string log_dir_; @@ -81,7 +81,7 @@ class GcsNodeManagerExportAPITest : public ::testing::Test { TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { // Test export event is written when a node is added with HandleRegisterNode gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); auto node = Mocker::GenNodeInfo(); rpc::RegisterNodeRequest register_request; @@ -103,7 +103,7 @@ TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { // Test export event is written when a node is removed with HandleUnregisterNode gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); node_manager.AddNode(node); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc index 6beeb8b7504c..aca66ca39c09 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc @@ -42,7 +42,7 @@ class GcsActorSchedulerMockTest : public Test { std::make_unique(nullptr, nullptr, nullptr, ClusterID::Nil()); raylet_client = std::make_shared(); core_worker_client = std::make_shared(); - client_pool = std::make_shared( + client_pool = std::make_unique( [this](const rpc::Address &) { return raylet_client; }); local_node_id = NodeID::FromRandom(); auto cluster_resource_scheduler = std::make_shared( @@ -52,7 +52,7 @@ class GcsActorSchedulerMockTest : public Test { /*is_node_available_fn=*/ [](auto) { return true; }, /*is_local_node_with_raylet=*/false); - auto cluster_task_manager = std::make_shared( + cluster_task_manager = std::make_unique( local_node_id, cluster_resource_scheduler, /*get_node_info=*/ @@ -70,10 +70,10 @@ class GcsActorSchedulerMockTest : public Test { io_context, *actor_table, *gcs_node_manager, - cluster_task_manager, + *cluster_task_manager, [this](auto a, auto b, auto c) { schedule_failure_handler(a); }, [this](auto a, const rpc::PushTaskReply) { schedule_success_handler(a); }, - client_pool, + *client_pool, [this](const rpc::Address &) { return core_worker_client; }); auto node_info = std::make_shared(); node_info->set_state(rpc::GcsNodeInfo::ALIVE); @@ -82,14 +82,16 @@ class GcsActorSchedulerMockTest : public Test { worker_id = WorkerID::FromRandom(); gcs_node_manager->AddNode(node_info); } + std::shared_ptr raylet_client; instrumented_io_context io_context; std::shared_ptr store_client; std::unique_ptr actor_table; - std::unique_ptr actor_scheduler; std::unique_ptr gcs_node_manager; + std::unique_ptr cluster_task_manager; + std::unique_ptr actor_scheduler; std::shared_ptr core_worker_client; - std::shared_ptr client_pool; + std::unique_ptr client_pool; std::shared_ptr>> counter; MockCallback schedule_failure_handler; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index c14497db7eaa..652129e10807 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include // clang-format off -#include "gtest/gtest.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/gcs/gcs_server/gcs_actor_scheduler.h" #include "ray/gcs/gcs_server/test/gcs_server_test_util.h" @@ -39,7 +40,7 @@ class GcsActorSchedulerTest : public ::testing::Test { store_client_ = std::make_shared(io_service_); gcs_table_storage_ = std::make_shared(io_service_); gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_, ClusterID::Nil()); + gcs_publisher_, gcs_table_storage_, raylet_client_pool_.get(), ClusterID::Nil()); gcs_actor_table_ = std::make_shared(store_client_); local_node_id_ = NodeID::FromRandom(); @@ -73,7 +74,7 @@ class GcsActorSchedulerTest : public ::testing::Test { io_service_, *gcs_actor_table_, *gcs_node_manager_, - cluster_task_manager_, + *cluster_task_manager_, /*schedule_failure_handler=*/ [this](std::shared_ptr actor, const rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, @@ -84,7 +85,7 @@ class GcsActorSchedulerTest : public ::testing::Test { [this](std::shared_ptr actor, const rpc::PushTaskReply &reply) { success_actors_.emplace_back(std::move(actor)); }, - raylet_client_pool_, + *raylet_client_pool_, /*client_factory=*/ [this](const rpc::Address &address) { return worker_client_; }, /*normal_task_resources_changed_callback=*/ diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index b0b31b182d77..e83a5eb1173a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -61,7 +61,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { void SetUp() override { raylet_client_ = std::make_shared(); - client_pool_ = std::make_shared( + client_pool_ = std::make_unique( [this](const rpc::Address &) { return raylet_client_; }); cluster_resource_manager_ = std::make_unique(io_service_); gcs_node_manager_ = std::make_shared(); @@ -86,7 +86,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { *gcs_node_manager_, *gcs_actor_manager_, *gcs_placement_group_manager_, - client_pool_)); + *client_pool_)); } public: diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index a8a0157e0d54..a124daf50ad2 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -28,7 +28,7 @@ class GcsNodeManagerTest : public ::testing::Test { public: GcsNodeManagerTest() { raylet_client_ = std::make_shared(); - client_pool_ = std::make_shared( + client_pool_ = std::make_unique( [this](const rpc::Address &) { return raylet_client_; }); gcs_publisher_ = std::make_shared( std::make_unique()); @@ -37,13 +37,13 @@ class GcsNodeManagerTest : public ::testing::Test { protected: std::shared_ptr gcs_table_storage_; std::shared_ptr raylet_client_; - std::shared_ptr client_pool_; + std::unique_ptr client_pool_; std::shared_ptr gcs_publisher_; }; TEST_F(GcsNodeManagerTest, TestManagement) { gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); @@ -58,7 +58,7 @@ TEST_F(GcsNodeManagerTest, TestManagement) { TEST_F(GcsNodeManagerTest, TestListener) { gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + gcs_publisher_, gcs_table_storage_, client_pool_.get(), ClusterID::Nil()); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc index 6cfd689ac168..1e3ef61060c8 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc @@ -47,7 +47,7 @@ class GcsPlacementGroupManagerMockTest : public Test { gcs_placement_group_manager_ = std::make_unique(io_context_, - gcs_placement_group_scheduler_, + gcs_placement_group_scheduler_.get(), gcs_table_storage_, *resource_manager_, [](auto &) { return ""; }); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 268096815cbe..e98bd2ec6e35 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -89,7 +89,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { io_service_, cluster_resource_manager_, *gcs_node_manager_, NodeID::FromRandom()); gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager( io_service_, - mock_placement_group_scheduler_, + mock_placement_group_scheduler_.get(), gcs_table_storage_, *gcs_resource_manager_, [this](const JobID &job_id) { return job_namespace_table_[job_id]; })); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 5d3f11ed39b0..adcc78dca25f 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -55,21 +55,21 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { [](auto) { return true; }, /*is_local_node_with_raylet=*/false); gcs_node_manager_ = std::make_shared( - gcs_publisher_, gcs_table_storage_, raylet_client_pool_, ClusterID::Nil()); + gcs_publisher_, gcs_table_storage_, raylet_client_pool_.get(), ClusterID::Nil()); gcs_resource_manager_ = std::make_shared( io_service_, cluster_resource_scheduler_->GetClusterResourceManager(), *gcs_node_manager_, local_node_id); store_client_ = std::make_shared(io_service_); - raylet_client_pool_ = std::make_shared( + raylet_client_pool_ = std::make_unique( [this](const rpc::Address &addr) { return raylet_clients_[addr.port()]; }); scheduler_ = std::make_shared( io_service_, gcs_table_storage_, *gcs_node_manager_, *cluster_resource_scheduler_, - raylet_client_pool_); + *raylet_client_pool_); counter_.reset(new CounterMap()); } @@ -296,7 +296,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { ABSL_GUARDED_BY(placement_group_requests_mutex_); std::shared_ptr gcs_publisher_; std::shared_ptr gcs_table_storage_; - std::shared_ptr raylet_client_pool_; + std::unique_ptr raylet_client_pool_; std::shared_ptr> counter_; }; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 058c40f97fcf..752cbd6b3e98 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -161,6 +161,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { const NodeID &self_node_id_; /// Responsible for resource tracking/view of the cluster. + /// TODO(hjiang): Use reference instead of shared pointer. std::shared_ptr cluster_resource_scheduler_; /// Function to get the node information of a given node id.