Skip to content

Commit

Permalink
[core] Prestart worker with runtime env (#49994)
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny authored Jan 27, 2025
1 parent ef9867e commit 2f859d0
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 22 deletions.
66 changes: 49 additions & 17 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ void WorkerPool::Start() {
}

if (RayConfig::instance().enable_worker_prestart()) {
PrestartDefaultCpuWorkers(Language::PYTHON, num_prestart_python_workers);
rpc::TaskSpec rpc_task_spec;
rpc_task_spec.set_language(Language::PYTHON);
rpc_task_spec.mutable_runtime_env_info()->set_serialized_runtime_env("{}");

TaskSpecification task_spec{std::move(rpc_task_spec)};
PrestartWorkersInternal(task_spec, num_prestart_python_workers);
}
}

Expand Down Expand Up @@ -898,7 +903,12 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
if (!first_job_registered_ && RayConfig::instance().prestart_worker_first_driver() &&
!RayConfig::instance().enable_worker_prestart()) {
RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_prestart_python_workers;
PrestartDefaultCpuWorkers(Language::PYTHON, num_prestart_python_workers);
rpc::TaskSpec rpc_task_spec;
rpc_task_spec.set_language(Language::PYTHON);
rpc_task_spec.mutable_runtime_env_info()->set_serialized_runtime_env("{}");

TaskSpecification task_spec{std::move(rpc_task_spec)};
PrestartWorkersInternal(task_spec, num_prestart_python_workers);
}

// Invoke the `send_reply_callback` later to only finish driver
Expand Down Expand Up @@ -1448,10 +1458,8 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
<< task_spec.DebugString() << " has runtime env "
<< task_spec.HasRuntimeEnv();
if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) ||
task_spec.HasRuntimeEnv() || task_spec.GetLanguage() != ray::Language::PYTHON) {
task_spec.GetLanguage() != ray::Language::PYTHON) {
return; // Not handled.
// TODO(architkulkarni): We'd eventually like to prestart workers with the same
// runtime env to improve initial startup performance.
}

auto &state = GetStateForLanguage(task_spec.GetLanguage());
Expand All @@ -1470,21 +1478,45 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
<< backlog_size << " and available CPUs " << num_available_cpus
<< " num idle workers " << state.idle.size()
<< " num registered workers " << state.registered_workers.size();
PrestartDefaultCpuWorkers(task_spec.GetLanguage(), num_needed);
PrestartWorkersInternal(task_spec, num_needed);
}
}

void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed) {
// default workers don't use runtime env.
RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_needed;
for (int i = 0; i < num_needed; i++) {
PopWorkerStatus status;
StartWorkerProcess(language,
rpc::WorkerType::WORKER,
JobID::Nil(),
&status,
/*dynamic_options*/ {},
CalculateRuntimeEnvHash("{}"));
void WorkerPool::PrestartWorkersInternal(const TaskSpecification &task_spec,
int64_t num_needed) {
RAY_LOG(DEBUG) << "PrestartWorkers " << num_needed;
for (int ii = 0; ii < num_needed; ++ii) {
// Prestart worker with no runtime env.
if (IsRuntimeEnvEmpty(task_spec.SerializedRuntimeEnv())) {
PopWorkerStatus status;
StartWorkerProcess(
task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(), &status);
continue;
}

// Prestart worker with runtime env.
GetOrCreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(),
task_spec.RuntimeEnvConfig(),
task_spec.JobId(),
[this, task_spec = task_spec](bool successful,
const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (!successful) {
RAY_LOG(ERROR) << "Fails to create or get runtime env "
<< setup_error_message;
return;
}
PopWorkerStatus status;
StartWorkerProcess(task_spec.GetLanguage(),
rpc::WorkerType::WORKER,
task_spec.JobId(),
&status,
/*dynamic_options=*/{},
task_spec.GetRuntimeEnvHash(),
serialized_runtime_env_context,
task_spec.RuntimeEnvInfo());
});
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// We aim to prestart 1 worker per CPU, up to the the backlog size.
void PrestartWorkers(const TaskSpecification &task_spec, int64_t backlog_size);

/// Try to prestart a number of CPU workers with the given language.
///
void PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed);
void PrestartWorkersInternal(const TaskSpecification &task_spec, int64_t num_needed);

/// Return the current size of the worker pool for the requested language. Counts only
/// idle workers.
Expand Down
21 changes: 19 additions & 2 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,24 @@ TEST_F(WorkerPoolDriverRegisteredTest, TestPrestartingWorkers) {
ASSERT_EQ(worker_pool_->NumWorkersStarting(), POOL_SIZE_SOFT_LIMIT);
}

TEST_F(WorkerPoolDriverRegisteredTest, TestPrestartingWorkersWithRuntimeEnv) {
auto task_spec = ExampleTaskSpec();
task_spec.GetMutableMessage().mutable_runtime_env_info()->set_serialized_runtime_env(
"{\"env_vars\": {\"FOO\": \"bar\"}}");
// Prestarts 2 workers.
worker_pool_->PrestartWorkers(task_spec, 2);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2);
// Prestarts 1 more worker.
worker_pool_->PrestartWorkers(task_spec, 3);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
// No more needed.
worker_pool_->PrestartWorkers(task_spec, 1);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
// Capped by soft limit.
worker_pool_->PrestartWorkers(task_spec, 20);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), POOL_SIZE_SOFT_LIMIT);
}

TEST_F(WorkerPoolDriverRegisteredTest, HandleWorkerPushPop) {
std::shared_ptr<WorkerInterface> popped_worker;
const auto task_spec = ExampleTaskSpec();
Expand Down Expand Up @@ -2124,8 +2142,7 @@ TEST_F(WorkerPoolDriverRegisteredTest, TestIOWorkerFailureAndSpawn) {

TEST_F(WorkerPoolDriverRegisteredTest, WorkerReuseForPrestartedWorker) {
const auto task_spec = ExampleTaskSpec();

worker_pool_->PrestartDefaultCpuWorkers(ray::Language::PYTHON, 1);
worker_pool_->PrestartWorkersInternal(task_spec, /*num_needed=*/1);
worker_pool_->PushWorkers(0, task_spec.JobId());
// One worker process has been prestarted.
ASSERT_EQ(worker_pool_->GetProcessSize(), 1);
Expand Down

0 comments on commit 2f859d0

Please sign in to comment.