Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Prestart worker with runtime env #49994

Merged
66 changes: 49 additions & 17 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ void WorkerPool::Start() {
}

if (RayConfig::instance().enable_worker_prestart()) {
PrestartDefaultCpuWorkers(Language::PYTHON, num_prestart_python_workers);
rpc::TaskSpec rpc_task_spec;
rpc_task_spec.set_language(Language::PYTHON);
rpc_task_spec.mutable_runtime_env_info()->set_serialized_runtime_env("{}");

TaskSpecification task_spec{std::move(rpc_task_spec)};
PrestartWorkersInternal(task_spec, num_prestart_python_workers);
}
}

Expand Down Expand Up @@ -898,7 +903,12 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
if (!first_job_registered_ && RayConfig::instance().prestart_worker_first_driver() &&
!RayConfig::instance().enable_worker_prestart()) {
RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_prestart_python_workers;
PrestartDefaultCpuWorkers(Language::PYTHON, num_prestart_python_workers);
rpc::TaskSpec rpc_task_spec;
rpc_task_spec.set_language(Language::PYTHON);
rpc_task_spec.mutable_runtime_env_info()->set_serialized_runtime_env("{}");

TaskSpecification task_spec{std::move(rpc_task_spec)};
PrestartWorkersInternal(task_spec, num_prestart_python_workers);
}

// Invoke the `send_reply_callback` later to only finish driver
Expand Down Expand Up @@ -1448,10 +1458,8 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
<< task_spec.DebugString() << " has runtime env "
<< task_spec.HasRuntimeEnv();
if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) ||
task_spec.HasRuntimeEnv() || task_spec.GetLanguage() != ray::Language::PYTHON) {
task_spec.GetLanguage() != ray::Language::PYTHON) {
return; // Not handled.
// TODO(architkulkarni): We'd eventually like to prestart workers with the same
// runtime env to improve initial startup performance.
}

auto &state = GetStateForLanguage(task_spec.GetLanguage());
Expand All @@ -1470,21 +1478,45 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
<< backlog_size << " and available CPUs " << num_available_cpus
<< " num idle workers " << state.idle.size()
<< " num registered workers " << state.registered_workers.size();
PrestartDefaultCpuWorkers(task_spec.GetLanguage(), num_needed);
PrestartWorkersInternal(task_spec, num_needed);
}
}

void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed) {
// default workers don't use runtime env.
RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_needed;
for (int i = 0; i < num_needed; i++) {
PopWorkerStatus status;
StartWorkerProcess(language,
rpc::WorkerType::WORKER,
JobID::Nil(),
&status,
/*dynamic_options*/ {},
CalculateRuntimeEnvHash("{}"));
void WorkerPool::PrestartWorkersInternal(const TaskSpecification &task_spec,
int64_t num_needed) {
RAY_LOG(DEBUG) << "PrestartWorkers " << num_needed;
for (int ii = 0; ii < num_needed; ++ii) {
dentiny marked this conversation as resolved.
Show resolved Hide resolved
// Prestart worker with no runtime env.
if (IsRuntimeEnvEmpty(task_spec.SerializedRuntimeEnv())) {
PopWorkerStatus status;
StartWorkerProcess(
task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(), &status);
continue;
}

// Prestart worker with runtime env.
GetOrCreateRuntimeEnv(
dentiny marked this conversation as resolved.
Show resolved Hide resolved
task_spec.SerializedRuntimeEnv(),
task_spec.RuntimeEnvConfig(),
JobID::Nil(),
dentiny marked this conversation as resolved.
Show resolved Hide resolved
dentiny marked this conversation as resolved.
Show resolved Hide resolved
[this, task_spec = task_spec](bool successful,
const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (!successful) {
RAY_LOG(ERROR) << "Fails to create or get runtime env "
<< setup_error_message;
return;
}
PopWorkerStatus status;
StartWorkerProcess(task_spec.GetLanguage(),
rpc::WorkerType::WORKER,
task_spec.JobId(),
&status,
/*dynamic_options=*/{},
task_spec.GetRuntimeEnvHash(),
serialized_runtime_env_context,
task_spec.RuntimeEnvInfo());
});
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// We aim to prestart 1 worker per CPU, up to the the backlog size.
void PrestartWorkers(const TaskSpecification &task_spec, int64_t backlog_size);

/// Try to prestart a number of CPU workers with the given language.
///
void PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed);
void PrestartWorkersInternal(const TaskSpecification &task_spec, int64_t num_needed);

/// Return the current size of the worker pool for the requested language. Counts only
/// idle workers.
Expand Down
Loading