Skip to content

Commit

Permalink
feat: EXC-1787: Penalize canisters at the end of the round (#2601)
Browse files Browse the repository at this point in the history
The logic of penalizing canisters for full execution and distributing
the priority across the subnet has been moved to the end of the round.
This change is necessary for priority-based eviction.
    
Newly created canisters join the first execution round with an
accumulated priority of zero, which breaks two existing tests, but is
not a practical issue. To compensate for this, during the reset round,
we set the accumulated priority to the canister's compute allocation
instead of the default zero priority.

There are also a few minor optimizations.
  • Loading branch information
berestovskyy authored Nov 17, 2024
1 parent 80b90f2 commit 77164cd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 59 deletions.
78 changes: 23 additions & 55 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,15 @@ impl RoundSchedule {
// of canisters and their compute allocations.
let is_reset_round = (current_round.get() % accumulated_priority_reset_interval.get()) == 0;

// Compute the priority of the canisters for this round.
// Collect the priority of the canisters for this round.
let mut accumulated_priority_invariant = AccumulatedPriority::default();
let mut accumulated_priority_deviation = 0;
for (&canister_id, canister) in canister_states.iter_mut() {
if is_reset_round {
canister.scheduler_state.accumulated_priority = Default::default();
// By default, each canister accumulated priority is set to its compute allocation.
canister.scheduler_state.accumulated_priority =
(canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier)
.into();
canister.scheduler_state.priority_credit = Default::default();
}

Expand All @@ -438,6 +441,12 @@ impl RoundSchedule {
accumulated_priority_invariant += accumulated_priority;
accumulated_priority_deviation +=
accumulated_priority.get() * accumulated_priority.get();
if !canister.has_input() {
canister
.system_state
.canister_metrics
.skipped_round_due_to_no_messages += 1;
}
}
// Assert there is at least `1%` of free capacity to distribute across canisters.
// It's guaranteed by `validate_compute_allocation()`
Expand Down Expand Up @@ -468,15 +477,13 @@ impl RoundSchedule {
.saturating_sub(total_compute_allocation_percent)
* scheduler_cores as i64;

// Fully divide the free allocation across all canisters.
// Compute `long_execution_compute_allocation`.
let mut long_executions_compute_allocation = 0;
let mut number_of_long_executions = 0;
for rs in round_states.iter_mut() {
// De-facto compute allocation includes bonus allocation
let factual =
rs.compute_allocation.as_percent() as i64 * multiplier + free_capacity_per_canister;
// Increase accumulated priority by de-facto compute allocation.
rs.accumulated_priority += factual.into();
// Count long executions and sum up their compute allocation.
if rs.has_aborted_or_paused_execution {
// Note: factual compute allocation is multiplied by `multiplier`
Expand All @@ -485,30 +492,10 @@ impl RoundSchedule {
}
}

// Optimization that makes use of accessing a canister state without an extra canister id lookup.
// IMPORTANT! Optimization relies on the fact that elements in `canister_states` and `round_states` follow in the same order.
for ((&_, canister), rs) in canister_states.iter_mut().zip(round_states.iter()) {
debug_assert!(
canister.canister_id() == rs.canister_id,
"Elements in canister_states and round_states must follow in the same order",
);
// Update canister state with a new accumulated_priority.
canister.scheduler_state.accumulated_priority = rs.accumulated_priority;

// Record a canister metric.
if !canister.has_input() {
canister
.system_state
.canister_metrics
.skipped_round_due_to_no_messages += 1;
}
}

// Count long execution cores by dividing `long_execution_compute_allocation`
// by `100%` and rounding up (as one scheduler core is reserved to guarantee
// long executions progress).
// Note, the `long_execution_compute_allocation` is in percent multiplied
// by the `multiplier`.
// Compute the number of long execution cores by dividing
// `long_execution_compute_allocation` by `100%` and rounding up
// (as one scheduler core is reserved to guarantee long executions progress).
// The `long_execution_compute_allocation` is in multiplied percent.
let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1)
/ (100 * multiplier)) as usize;
// If there are long executions, the `long_execution_cores` must be non-zero.
Expand Down Expand Up @@ -550,34 +537,15 @@ impl RoundSchedule {
.collect(),
);

for canister_id in round_schedule
.ordered_long_execution_canister_ids
.iter()
.take(long_execution_cores)
{
let scheduling_order = round_schedule.scheduling_order();
let scheduling_order = scheduling_order
.prioritized_long_canister_ids
.chain(scheduling_order.new_canister_ids)
.chain(scheduling_order.opportunistic_long_canister_ids);
// The number of active scheduler cores is limited by the number
// of canisters to schedule.
let active_cores = scheduler_cores.min(number_of_canisters);
for (i, canister_id) in scheduling_order.take(active_cores).enumerate() {
let canister_state = canister_states.get_mut(canister_id).unwrap();
// As top `scheduler_cores` canisters are guaranteed to be scheduled
// this round, their accumulated priorities must be decreased here
// by `capacity * multiplier / scheduler_cores`. But instead this
// value is accumulated in the `priority_credit`, and applied later:
// * For short executions, the `priority_credit` is deducted from
// the `accumulated_priority` at the end of the round.
// * For long executions, the `priority_credit` is accumulated
// for a few rounds, and deducted from the `accumulated_priority`
// at the end of the long execution.
canister_state.scheduler_state.priority_credit +=
(compute_capacity_percent * multiplier / active_cores as i64).into();
if i < round_schedule.long_execution_cores {
canister_state.scheduler_state.long_execution_mode =
LongExecutionMode::Prioritized;
}
}
let canister = canister_states.get_mut(canister_id).unwrap();
canister.scheduler_state.long_execution_mode = LongExecutionMode::Prioritized;
}

round_schedule
}

Expand Down
13 changes: 9 additions & 4 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1884,7 +1884,11 @@ fn scheduler_long_execution_progress_across_checkpoints() {
// Penalize canister for a long execution.
let message_id = test.send_ingress(penalized_long_id, ingress(message_instructions));
assert_eq!(test.ingress_status(&message_id), IngressStatus::Unknown);
for _ in 0..message_instructions / slice_instructions {
for i in 0..message_instructions / slice_instructions {
// Without short executions, all idle canister will be equally executed.
if let Some(canister_id) = canister_ids.get(i as usize % num_canisters) {
test.send_ingress(*canister_id, ingress(slice_instructions));
}
test.execute_round(ExecutionRoundType::OrdinaryRound);
}
assert_matches!(
Expand Down Expand Up @@ -4543,7 +4547,7 @@ fn scheduler_respects_compute_allocation(
let replicated_state = test.state();
let number_of_canisters = replicated_state.canister_states.len();
let total_compute_allocation = replicated_state.total_compute_allocation();
assert!(total_compute_allocation <= 100 * scheduler_cores as u64);
prop_assert!(total_compute_allocation <= 100 * scheduler_cores as u64);

// Count, for each canister, how many times it is the first canister
// to be executed by a thread.
Expand All @@ -4556,7 +4560,8 @@ fn scheduler_respects_compute_allocation(

let canister_ids: Vec<_> = test.state().canister_states.iter().map(|x| *x.0).collect();

for _ in 0..number_of_rounds {
// Add one more round as we update the accumulated priorities at the end of the round now.
for _ in 0..=number_of_rounds {
for canister_id in canister_ids.iter() {
test.expect_heartbeat(*canister_id, instructions(B as u64));
}
Expand Down Expand Up @@ -4584,7 +4589,7 @@ fn scheduler_respects_compute_allocation(
number_of_rounds / 100 * compute_allocation + 1
};

assert!(
prop_assert!(
*count >= expected_count,
"Canister {} (allocation {}) should have been scheduled \
{} out of {} rounds, was scheduled only {} rounds instead.",
Expand Down

0 comments on commit 77164cd

Please sign in to comment.