From 2f859d0100d64f39c4c41014bf0e018f406cde06 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 27 Jan 2025 15:01:22 -0800 Subject: [PATCH] [core] Prestart worker with runtime env (#49994) Signed-off-by: dentiny --- src/ray/raylet/worker_pool.cc | 66 ++++++++++++++++++++++-------- src/ray/raylet/worker_pool.h | 4 +- src/ray/raylet/worker_pool_test.cc | 21 +++++++++- 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index de4aa7e047ee1..117d0a506615f 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -177,7 +177,12 @@ void WorkerPool::Start() { } if (RayConfig::instance().enable_worker_prestart()) { - PrestartDefaultCpuWorkers(Language::PYTHON, num_prestart_python_workers); + rpc::TaskSpec rpc_task_spec; + rpc_task_spec.set_language(Language::PYTHON); + rpc_task_spec.mutable_runtime_env_info()->set_serialized_runtime_env("{}"); + + TaskSpecification task_spec{std::move(rpc_task_spec)}; + PrestartWorkersInternal(task_spec, num_prestart_python_workers); } } @@ -898,7 +903,12 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver if (!first_job_registered_ && RayConfig::instance().prestart_worker_first_driver() && !RayConfig::instance().enable_worker_prestart()) { RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_prestart_python_workers; - PrestartDefaultCpuWorkers(Language::PYTHON, num_prestart_python_workers); + rpc::TaskSpec rpc_task_spec; + rpc_task_spec.set_language(Language::PYTHON); + rpc_task_spec.mutable_runtime_env_info()->set_serialized_runtime_env("{}"); + + TaskSpecification task_spec{std::move(rpc_task_spec)}; + PrestartWorkersInternal(task_spec, num_prestart_python_workers); } // Invoke the `send_reply_callback` later to only finish driver @@ -1448,10 +1458,8 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, << task_spec.DebugString() << " has runtime env " << task_spec.HasRuntimeEnv(); if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) || - task_spec.HasRuntimeEnv() || task_spec.GetLanguage() != ray::Language::PYTHON) { + task_spec.GetLanguage() != ray::Language::PYTHON) { return; // Not handled. - // TODO(architkulkarni): We'd eventually like to prestart workers with the same - // runtime env to improve initial startup performance. } auto &state = GetStateForLanguage(task_spec.GetLanguage()); @@ -1470,21 +1478,45 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, << backlog_size << " and available CPUs " << num_available_cpus << " num idle workers " << state.idle.size() << " num registered workers " << state.registered_workers.size(); - PrestartDefaultCpuWorkers(task_spec.GetLanguage(), num_needed); + PrestartWorkersInternal(task_spec, num_needed); } } -void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed) { - // default workers don't use runtime env. - RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_needed; - for (int i = 0; i < num_needed; i++) { - PopWorkerStatus status; - StartWorkerProcess(language, - rpc::WorkerType::WORKER, - JobID::Nil(), - &status, - /*dynamic_options*/ {}, - CalculateRuntimeEnvHash("{}")); +void WorkerPool::PrestartWorkersInternal(const TaskSpecification &task_spec, + int64_t num_needed) { + RAY_LOG(DEBUG) << "PrestartWorkers " << num_needed; + for (int ii = 0; ii < num_needed; ++ii) { + // Prestart worker with no runtime env. + if (IsRuntimeEnvEmpty(task_spec.SerializedRuntimeEnv())) { + PopWorkerStatus status; + StartWorkerProcess( + task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(), &status); + continue; + } + + // Prestart worker with runtime env. + GetOrCreateRuntimeEnv( + task_spec.SerializedRuntimeEnv(), + task_spec.RuntimeEnvConfig(), + task_spec.JobId(), + [this, task_spec = task_spec](bool successful, + const std::string &serialized_runtime_env_context, + const std::string &setup_error_message) { + if (!successful) { + RAY_LOG(ERROR) << "Fails to create or get runtime env " + << setup_error_message; + return; + } + PopWorkerStatus status; + StartWorkerProcess(task_spec.GetLanguage(), + rpc::WorkerType::WORKER, + task_spec.JobId(), + &status, + /*dynamic_options=*/{}, + task_spec.GetRuntimeEnvHash(), + serialized_runtime_env_context, + task_spec.RuntimeEnvInfo()); + }); } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 1c114ebca8da1..cd3af644d5122 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -426,9 +426,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// We aim to prestart 1 worker per CPU, up to the the backlog size. void PrestartWorkers(const TaskSpecification &task_spec, int64_t backlog_size); - /// Try to prestart a number of CPU workers with the given language. - /// - void PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed); + void PrestartWorkersInternal(const TaskSpecification &task_spec, int64_t num_needed); /// Return the current size of the worker pool for the requested language. Counts only /// idle workers. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 73447daae2e7f..29d6d378fcc88 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -651,6 +651,24 @@ TEST_F(WorkerPoolDriverRegisteredTest, TestPrestartingWorkers) { ASSERT_EQ(worker_pool_->NumWorkersStarting(), POOL_SIZE_SOFT_LIMIT); } +TEST_F(WorkerPoolDriverRegisteredTest, TestPrestartingWorkersWithRuntimeEnv) { + auto task_spec = ExampleTaskSpec(); + task_spec.GetMutableMessage().mutable_runtime_env_info()->set_serialized_runtime_env( + "{\"env_vars\": {\"FOO\": \"bar\"}}"); + // Prestarts 2 workers. + worker_pool_->PrestartWorkers(task_spec, 2); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2); + // Prestarts 1 more worker. + worker_pool_->PrestartWorkers(task_spec, 3); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3); + // No more needed. + worker_pool_->PrestartWorkers(task_spec, 1); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3); + // Capped by soft limit. + worker_pool_->PrestartWorkers(task_spec, 20); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), POOL_SIZE_SOFT_LIMIT); +} + TEST_F(WorkerPoolDriverRegisteredTest, HandleWorkerPushPop) { std::shared_ptr popped_worker; const auto task_spec = ExampleTaskSpec(); @@ -2124,8 +2142,7 @@ TEST_F(WorkerPoolDriverRegisteredTest, TestIOWorkerFailureAndSpawn) { TEST_F(WorkerPoolDriverRegisteredTest, WorkerReuseForPrestartedWorker) { const auto task_spec = ExampleTaskSpec(); - - worker_pool_->PrestartDefaultCpuWorkers(ray::Language::PYTHON, 1); + worker_pool_->PrestartWorkersInternal(task_spec, /*num_needed=*/1); worker_pool_->PushWorkers(0, task_spec.JobId()); // One worker process has been prestarted. ASSERT_EQ(worker_pool_->GetProcessSize(), 1);