Skip to content

Commit

Permalink
cleanup gcs server shared pointer
Browse files Browse the repository at this point in the history
Signed-off-by: hjiang <[email protected]>
  • Loading branch information
dentiny committed Dec 5, 2024
1 parent 4282e73 commit ef27439
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ void GcsActorScheduler::OnActorDestruction(std::shared_ptr<GcsActor> actor) {

void GcsActorScheduler::ReturnActorAcquiredResources(std::shared_ptr<GcsActor> actor) {
auto &cluster_resource_manager =
cluster_task_manager_.GetClusterResourceScheduler()->GetClusterResourceManager();
cluster_task_manager_.GetClusterResourceScheduler().GetClusterResourceManager();
cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(actor->GetNodeID().Binary()),
actor->GetAcquiredResources().GetResourceSet());
Expand Down
15 changes: 7 additions & 8 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
namespace ray {
namespace gcs {

GcsResourceManager::GcsResourceManager(
instrumented_io_context &io_context,
ClusterResourceManager &cluster_resource_manager,
GcsNodeManager &gcs_node_manager,
NodeID local_node_id,
std::shared_ptr<ClusterTaskManager> cluster_task_manager)
GcsResourceManager::GcsResourceManager(instrumented_io_context &io_context,
ClusterResourceManager &cluster_resource_manager,
GcsNodeManager &gcs_node_manager,
NodeID local_node_id,
ClusterTaskManager *cluster_task_manager)
: io_context_(io_context),
cluster_resource_manager_(cluster_resource_manager),
gcs_node_manager_(gcs_node_manager),
local_node_id_(std::move(local_node_id)),
cluster_task_manager_(std::move(cluster_task_manager)) {}
cluster_task_manager_(cluster_task_manager) {}

void GcsResourceManager::ConsumeSyncMessage(
std::shared_ptr<const syncer::RaySyncMessage> message) {
Expand Down Expand Up @@ -197,7 +196,7 @@ void GcsResourceManager::HandleGetAllResourceUsage(
batch.add_batch()->CopyFrom(usage.second);
}

if (cluster_task_manager_) {
if (cluster_task_manager_ != nullptr) {
// Fill the gcs info when gcs actor scheduler is enabled.
rpc::ResourcesData gcs_resources_data;
cluster_task_manager_->FillPendingActorInfo(gcs_resources_data);
Expand Down
13 changes: 6 additions & 7 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
public syncer::ReceiverInterface {
public:
/// Create a GcsResourceManager.
explicit GcsResourceManager(
instrumented_io_context &io_context,
ClusterResourceManager &cluster_resource_manager,
GcsNodeManager &gcs_node_manager,
NodeID local_node_id,
std::shared_ptr<ClusterTaskManager> cluster_task_manager = nullptr);
explicit GcsResourceManager(instrumented_io_context &io_context,
ClusterResourceManager &cluster_resource_manager,
GcsNodeManager &gcs_node_manager,
NodeID local_node_id,
ClusterTaskManager *cluster_task_manager = nullptr);

virtual ~GcsResourceManager() = default;

Expand Down Expand Up @@ -199,7 +198,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
ClusterResourceManager &cluster_resource_manager_;
GcsNodeManager &gcs_node_manager_;
NodeID local_node_id_;
std::shared_ptr<ClusterTaskManager> cluster_task_manager_;
ClusterTaskManager *cluster_task_manager_;
/// Num of alive nodes in the cluster.
size_t num_alive_nodes_ = 0;
};
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
cluster_resource_scheduler_->GetClusterResourceManager(),
*gcs_node_manager_,
kGCSNodeID,
cluster_task_manager_);
cluster_task_manager_.get());

// Initialize by gcs tables data.
gcs_resource_manager_->Initialize(gcs_init_data);
Expand Down Expand Up @@ -396,9 +396,9 @@ void GcsServer::InitClusterResourceScheduler() {

void GcsServer::InitClusterTaskManager() {
RAY_CHECK(cluster_resource_scheduler_);
cluster_task_manager_ = std::make_shared<ClusterTaskManager>(
cluster_task_manager_ = std::make_unique<ClusterTaskManager>(
kGCSNodeID,
cluster_resource_scheduler_,
*cluster_resource_scheduler_,
/*get_node_info=*/
[this](const NodeID &node_id) {
auto node = gcs_node_manager_->GetAliveNode(node_id);
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ class GcsServer {
rpc::ClientCallManager client_call_manager_;
/// Node manager client pool.
std::unique_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// The gcs resource manager.
std::unique_ptr<GcsResourceManager> gcs_resource_manager_;
/// The cluster resource scheduler.
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
/// The cluster task manager.
std::shared_ptr<ClusterTaskManager> cluster_task_manager_;
std::unique_ptr<ClusterTaskManager> cluster_task_manager_;
/// The gcs resource manager.
std::unique_ptr<GcsResourceManager> gcs_resource_manager_;
/// The autoscaler state manager.
std::unique_ptr<GcsAutoscalerStateManager> gcs_autoscaler_state_manager_;
/// The gcs node manager.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleAndDestroyOneActor) {
scheduling::NodeID scheduling_node_id(node->node_id());
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
const auto &cluster_resource_manager =
cluster_task_manager_->GetClusterResourceScheduler()->GetClusterResourceManager();
cluster_task_manager_->GetClusterResourceScheduler().GetClusterResourceManager();
auto resource_view_before_scheduling = cluster_resource_manager.GetResourceView();
ASSERT_TRUE(resource_view_before_scheduling.contains(scheduling_node_id));

Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ NodeManager::NodeManager(
max_task_args_memory);
cluster_task_manager_ = std::make_shared<ClusterTaskManager>(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
*std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
get_node_info_func,
announce_infeasible_task,
*local_task_manager_);
Expand Down
17 changes: 8 additions & 9 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace raylet {

ClusterTaskManager::ClusterTaskManager(
const NodeID &self_node_id,
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
ClusterResourceScheduler &cluster_resource_scheduler,
internal::NodeInfoGetter get_node_info,
std::function<void(const RayTask &)> announce_infeasible_task,
ILocalTaskManager &local_task_manager,
Expand Down Expand Up @@ -156,7 +156,7 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() {
RayTask task = work->task;
RAY_LOG(DEBUG) << "Scheduling pending task "
<< task.GetTaskSpecification().TaskId();
auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode(
auto scheduling_node_id = cluster_resource_scheduler_.GetBestSchedulableNode(
task.GetTaskSpecification(),
/*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary()
: task.GetPreferredNodeID(),
Expand All @@ -175,7 +175,7 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() {
!task.GetTaskSpecification().GetNodeAffinitySchedulingStrategySoft()) {
// This can only happen if the target node doesn't exist or is infeasible.
// The task will never be schedulable in either case so we should fail it.
if (cluster_resource_scheduler_->IsLocalNodeWithRaylet()) {
if (cluster_resource_scheduler_.IsLocalNodeWithRaylet()) {
ReplyCancelled(
*work,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE,
Expand Down Expand Up @@ -251,7 +251,7 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() {
RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:"
<< task.GetTaskSpecification().TaskId();
bool is_infeasible;
cluster_resource_scheduler_->GetBestSchedulableNode(
cluster_resource_scheduler_.GetBestSchedulableNode(
task.GetTaskSpecification(),
/*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary()
: task.GetPreferredNodeID(),
Expand Down Expand Up @@ -291,7 +291,7 @@ void ClusterTaskManager::FillResourceUsage(rpc::ResourcesData &data) {
scheduler_resource_reporter_.FillResourceUsage(data);
// This populates usage information.
syncer::ResourceViewSyncMessage resource_view_sync_message;
cluster_resource_scheduler_->GetLocalResourceManager().PopulateResourceViewSyncMessage(
cluster_resource_scheduler_.GetLocalResourceManager().PopulateResourceViewSyncMessage(
resource_view_sync_message);
(*data.mutable_resources_total()) =
std::move(resource_view_sync_message.resources_total());
Expand Down Expand Up @@ -357,7 +357,7 @@ bool ClusterTaskManager::AnyPendingTasksForResourceAcquisition(

void ClusterTaskManager::RecordMetrics() const {
internal_stats_.RecordMetrics();
cluster_resource_scheduler_->GetLocalResourceManager().RecordMetrics();
cluster_resource_scheduler_.GetLocalResourceManager().RecordMetrics();
}

std::string ClusterTaskManager::DebugStr() const {
Expand Down Expand Up @@ -385,7 +385,7 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to,
const auto &task_spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to;

if (!cluster_resource_scheduler_->AllocateRemoteTaskResources(
if (!cluster_resource_scheduler_.AllocateRemoteTaskResources(
scheduling::NodeID(spillback_to.Binary()),
task_spec.GetRequiredResources().GetResourceMap())) {
RAY_LOG(DEBUG) << "Tried to allocate resources for request " << task_spec.TaskId()
Expand All @@ -405,8 +405,7 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to,
send_reply_callback();
}

std::shared_ptr<ClusterResourceScheduler>
ClusterTaskManager::GetClusterResourceScheduler() const {
ClusterResourceScheduler &ClusterTaskManager::GetClusterResourceScheduler() const {
return cluster_resource_scheduler_;
}

Expand Down
7 changes: 3 additions & 4 deletions src/ray/raylet/scheduling/cluster_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// \param get_time_ms: A callback which returns the current time in milliseconds.
ClusterTaskManager(
const NodeID &self_node_id,
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
ClusterResourceScheduler &cluster_resource_scheduler,
internal::NodeInfoGetter get_node_info,
std::function<void(const RayTask &)> announce_infeasible_task,
ILocalTaskManager &local_task_manager,
Expand Down Expand Up @@ -132,7 +132,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// The helper to dump the debug state of the cluster task manater.
std::string DebugStr() const override;

std::shared_ptr<ClusterResourceScheduler> GetClusterResourceScheduler() const;
ClusterResourceScheduler &GetClusterResourceScheduler() const;

/// Get the count of tasks in `infeasible_tasks_`.
size_t GetInfeasibleQueueSize() const;
Expand Down Expand Up @@ -161,8 +161,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {

const NodeID &self_node_id_;
/// Responsible for resource tracking/view of the cluster.
/// TODO(hjiang): Use reference instead of shared pointer.
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
ClusterResourceScheduler &cluster_resource_scheduler_;

/// Function to get the node information of a given node id.
internal::NodeInfoGetter get_node_info_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/scheduling/scheduler_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ std::string SchedulerStats::ComputeAndReportDebugStr() {
buffer << "num_tasks_waiting_for_workers: " << num_tasks_waiting_for_workers_ << "\n";
buffer << "num_cancelled_tasks: " << num_cancelled_tasks_ << "\n";
buffer << "cluster_resource_scheduler state: "
<< cluster_task_manager_.cluster_resource_scheduler_->DebugString() << "\n";
<< cluster_task_manager_.cluster_resource_scheduler_.DebugString() << "\n";
local_task_manager_.DebugStr(buffer);

buffer << "==================================================\n";
Expand Down

0 comments on commit ef27439

Please sign in to comment.