diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index ac658f4dd34..e7f196ae8c9 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -412,12 +412,15 @@ impl RoundSchedule { // of canisters and their compute allocations. let is_reset_round = (current_round.get() % accumulated_priority_reset_interval.get()) == 0; - // Compute the priority of the canisters for this round. + // Collect the priority of the canisters for this round. let mut accumulated_priority_invariant = AccumulatedPriority::default(); let mut accumulated_priority_deviation = 0; for (&canister_id, canister) in canister_states.iter_mut() { if is_reset_round { - canister.scheduler_state.accumulated_priority = Default::default(); + // By default, each canister accumulated priority is set to its compute allocation. + canister.scheduler_state.accumulated_priority = + (canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier) + .into(); canister.scheduler_state.priority_credit = Default::default(); } @@ -438,6 +441,12 @@ impl RoundSchedule { accumulated_priority_invariant += accumulated_priority; accumulated_priority_deviation += accumulated_priority.get() * accumulated_priority.get(); + if !canister.has_input() { + canister + .system_state + .canister_metrics + .skipped_round_due_to_no_messages += 1; + } } // Assert there is at least `1%` of free capacity to distribute across canisters. // It's guaranteed by `validate_compute_allocation()` @@ -468,15 +477,13 @@ impl RoundSchedule { .saturating_sub(total_compute_allocation_percent) * scheduler_cores as i64; - // Fully divide the free allocation across all canisters. + // Compute `long_execution_compute_allocation`. let mut long_executions_compute_allocation = 0; let mut number_of_long_executions = 0; for rs in round_states.iter_mut() { // De-facto compute allocation includes bonus allocation let factual = rs.compute_allocation.as_percent() as i64 * multiplier + free_capacity_per_canister; - // Increase accumulated priority by de-facto compute allocation. - rs.accumulated_priority += factual.into(); // Count long executions and sum up their compute allocation. if rs.has_aborted_or_paused_execution { // Note: factual compute allocation is multiplied by `multiplier` @@ -485,30 +492,10 @@ impl RoundSchedule { } } - // Optimization that makes use of accessing a canister state without an extra canister id lookup. - // IMPORTANT! Optimization relies on the fact that elements in `canister_states` and `round_states` follow in the same order. - for ((&_, canister), rs) in canister_states.iter_mut().zip(round_states.iter()) { - debug_assert!( - canister.canister_id() == rs.canister_id, - "Elements in canister_states and round_states must follow in the same order", - ); - // Update canister state with a new accumulated_priority. - canister.scheduler_state.accumulated_priority = rs.accumulated_priority; - - // Record a canister metric. - if !canister.has_input() { - canister - .system_state - .canister_metrics - .skipped_round_due_to_no_messages += 1; - } - } - - // Count long execution cores by dividing `long_execution_compute_allocation` - // by `100%` and rounding up (as one scheduler core is reserved to guarantee - // long executions progress). - // Note, the `long_execution_compute_allocation` is in percent multiplied - // by the `multiplier`. + // Compute the number of long execution cores by dividing + // `long_execution_compute_allocation` by `100%` and rounding up + // (as one scheduler core is reserved to guarantee long executions progress). + // The `long_execution_compute_allocation` is in multiplied percent. let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1) / (100 * multiplier)) as usize; // If there are long executions, the `long_execution_cores` must be non-zero. @@ -550,34 +537,15 @@ impl RoundSchedule { .collect(), ); + for canister_id in round_schedule + .ordered_long_execution_canister_ids + .iter() + .take(long_execution_cores) { - let scheduling_order = round_schedule.scheduling_order(); - let scheduling_order = scheduling_order - .prioritized_long_canister_ids - .chain(scheduling_order.new_canister_ids) - .chain(scheduling_order.opportunistic_long_canister_ids); - // The number of active scheduler cores is limited by the number - // of canisters to schedule. - let active_cores = scheduler_cores.min(number_of_canisters); - for (i, canister_id) in scheduling_order.take(active_cores).enumerate() { - let canister_state = canister_states.get_mut(canister_id).unwrap(); - // As top `scheduler_cores` canisters are guaranteed to be scheduled - // this round, their accumulated priorities must be decreased here - // by `capacity * multiplier / scheduler_cores`. But instead this - // value is accumulated in the `priority_credit`, and applied later: - // * For short executions, the `priority_credit` is deducted from - // the `accumulated_priority` at the end of the round. - // * For long executions, the `priority_credit` is accumulated - // for a few rounds, and deducted from the `accumulated_priority` - // at the end of the long execution. - canister_state.scheduler_state.priority_credit += - (compute_capacity_percent * multiplier / active_cores as i64).into(); - if i < round_schedule.long_execution_cores { - canister_state.scheduler_state.long_execution_mode = - LongExecutionMode::Prioritized; - } - } + let canister = canister_states.get_mut(canister_id).unwrap(); + canister.scheduler_state.long_execution_mode = LongExecutionMode::Prioritized; } + round_schedule } diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index 7d2f30d5ed6..4aacc327b92 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -1884,7 +1884,11 @@ fn scheduler_long_execution_progress_across_checkpoints() { // Penalize canister for a long execution. let message_id = test.send_ingress(penalized_long_id, ingress(message_instructions)); assert_eq!(test.ingress_status(&message_id), IngressStatus::Unknown); - for _ in 0..message_instructions / slice_instructions { + for i in 0..message_instructions / slice_instructions { + // Without short executions, all idle canister will be equally executed. + if let Some(canister_id) = canister_ids.get(i as usize % num_canisters) { + test.send_ingress(*canister_id, ingress(slice_instructions)); + } test.execute_round(ExecutionRoundType::OrdinaryRound); } assert_matches!( @@ -4543,7 +4547,7 @@ fn scheduler_respects_compute_allocation( let replicated_state = test.state(); let number_of_canisters = replicated_state.canister_states.len(); let total_compute_allocation = replicated_state.total_compute_allocation(); - assert!(total_compute_allocation <= 100 * scheduler_cores as u64); + prop_assert!(total_compute_allocation <= 100 * scheduler_cores as u64); // Count, for each canister, how many times it is the first canister // to be executed by a thread. @@ -4556,7 +4560,8 @@ fn scheduler_respects_compute_allocation( let canister_ids: Vec<_> = test.state().canister_states.iter().map(|x| *x.0).collect(); - for _ in 0..number_of_rounds { + // Add one more round as we update the accumulated priorities at the end of the round now. + for _ in 0..=number_of_rounds { for canister_id in canister_ids.iter() { test.expect_heartbeat(*canister_id, instructions(B as u64)); } @@ -4584,7 +4589,7 @@ fn scheduler_respects_compute_allocation( number_of_rounds / 100 * compute_allocation + 1 }; - assert!( + prop_assert!( *count >= expected_count, "Canister {} (allocation {}) should have been scheduled \ {} out of {} rounds, was scheduled only {} rounds instead.",