Skip to content

Commit

Permalink
i#7067: Add per-workload simultaneous output limits (#7090)
Browse files Browse the repository at this point in the history
Adds a new input_workload_t field: output_limit. This is a peak
simultaneously-live output stream maximum for the inputs of that
workload.

To implement, atomics are used, with a new workload_info_t array
tracking per-workload fields. A new stat
memtrace_stream_t::SCHED_STAT_HIT_OUTPUT_LIMIT is added. An
initially-idle output attempts to steal immediately, as the initial
round-robin layout does not take into account output limits.

Adds a unit test.

Fixes #7067
  • Loading branch information
derekbruening authored Nov 21, 2024
1 parent f23c543 commit 337095e
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 37 deletions.
6 changes: 4 additions & 2 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ template <typename RecordType, typename ReaderType>
bool
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
const std::string &trace_path, const std::set<memref_tid_t> &only_threads,
const std::set<int> &only_shards, int verbosity,
const std::set<int> &only_shards, int output_limit, int verbosity,
typename sched_type_t::scheduler_options_t options)
{
verbosity_ = verbosity;
Expand All @@ -249,6 +249,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
typename sched_type_t::input_workload_t workload(trace_path, regions);
workload.only_threads = only_threads;
workload.only_shards = only_shards;
workload.output_limit = output_limit;
if (regions.empty() && skip_to_timestamp_ > 0) {
workload.times_of_interest.emplace_back(skip_to_timestamp_, 0);
}
Expand Down Expand Up @@ -383,7 +384,8 @@ analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t(
// The scheduler will call reader_t::init() for each input file. We assume
// that won't block (analyzer_multi_t separates out IPC readers).
typename sched_type_t::scheduler_options_t sched_ops;
if (!init_scheduler(trace_path, {}, {}, verbosity, std::move(sched_ops))) {
if (!init_scheduler(trace_path, {}, {}, /*output_limit=*/0, verbosity,
std::move(sched_ops))) {
success_ = false;
error_string_ = "Failed to create scheduler";
return;
Expand Down
2 changes: 1 addition & 1 deletion clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
init_scheduler(const std::string &trace_path,
// To include all threads/shards, use empty sets.
const std::set<memref_tid_t> &only_threads,
const std::set<int> &only_shards, int verbosity,
const std::set<int> &only_shards, int output_limit, int verbosity,
typename sched_type_t::scheduler_options_t options);

// For core-sharded, worker_count_ must be set prior to calling this; for parallel
Expand Down
6 changes: 4 additions & 2 deletions clients/drcachesim/analyzer_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
return;
}
if (!this->init_scheduler(tracedir, only_threads, only_shards,
op_verbose.get_value(), std::move(sched_ops))) {
op_sched_max_cores.get_value(), op_verbose.get_value(),
std::move(sched_ops))) {
this->success_ = false;
return;
}
Expand All @@ -573,7 +574,8 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
}
} else {
// Legacy file.
if (!this->init_scheduler(op_infile.get_value(), {}, {}, op_verbose.get_value(),
if (!this->init_scheduler(op_infile.get_value(), {}, {},
op_sched_max_cores.get_value(), op_verbose.get_value(),
std::move(sched_ops))) {
this->success_ = false;
return;
Expand Down
5 changes: 5 additions & 0 deletions clients/drcachesim/common/memtrace_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class memtrace_stream_t {
* Counts the number of output runqueue rebalances triggered by this output.
*/
SCHED_STAT_RUNQUEUE_REBALANCES,
/**
* Counts the instances where a workload's output limit prevented one of its
* inputs from being scheduled onto an output.
*/
SCHED_STAT_HIT_OUTPUT_LIMIT,
/** Count of statistic types. */
SCHED_STAT_TYPE_COUNT,
};
Expand Down
6 changes: 6 additions & 0 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,12 @@ droption_t<double> op_sched_exit_if_fraction_inputs_left(
"count is not considered (as it is not available), use discretion when raising "
"this value on uneven inputs.");

droption_t<int> op_sched_max_cores(
DROPTION_SCOPE_ALL, "sched_max_cores", 0,
"Limit scheduling to this many peak live cores",
"If non-zero, only this many live cores can be scheduled at any one time. "
"Other cores will remain idle.");

// Schedule_stats options.
droption_t<uint64_t>
op_schedule_stats_print_every(DROPTION_SCOPE_ALL, "schedule_stats_print_every",
Expand Down
1 change: 1 addition & 0 deletions clients/drcachesim/common/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ extern dynamorio::droption::droption_t<uint64_t> op_sched_migration_threshold_us
extern dynamorio::droption::droption_t<uint64_t> op_sched_rebalance_period_us;
extern dynamorio::droption::droption_t<double> op_sched_time_units_per_us;
extern dynamorio::droption::droption_t<double> op_sched_exit_if_fraction_inputs_left;
extern dynamorio::droption::droption_t<int> op_sched_max_cores;
extern dynamorio::droption::droption_t<uint64_t> op_schedule_stats_print_every;
extern dynamorio::droption::droption_t<std::string> op_syscall_template_file;
extern dynamorio::droption::droption_t<uint64_t> op_filter_stop_timestamp;
Expand Down
8 changes: 8 additions & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
*/
std::set<input_ordinal_t> only_shards;

/**
* If greater than zero, imposes a maximum number of outputs that the inputs
* comprising this workload can execute upon simultaneously. If an input would
* be executed next but would exceed this cap, a different input is selected
* instead or the output goes idle if none are found.
*/
int output_limit = 0;

// Work around a known Visual Studio issue where it complains about deleted copy
// constructors for unique_ptr by deleting our copies and defaulting our moves.
input_workload_t(const input_workload_t &) = delete;
Expand Down
55 changes: 40 additions & 15 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <cstdint>
#include <limits>
#include <mutex>
#include <sstream>
#include <thread>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -70,28 +71,27 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::~scheduler_dynamic_tmpl_t()

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule()
{
if (options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT)
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
// Assign initial inputs.
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
// Compute the min timestamp (==base_timestamp) per workload and sort
// all inputs by relative time from the base.
for (int workload_idx = 0;
workload_idx < static_cast<int>(workload2inputs.size()); ++workload_idx) {
for (int workload_idx = 0; workload_idx < static_cast<int>(workloads_.size());
++workload_idx) {
uint64_t min_time = std::numeric_limits<uint64_t>::max();
input_ordinal_t min_input = -1;
for (int input_idx : workload2inputs[workload_idx]) {
for (int input_idx : workloads_[workload_idx].inputs) {
if (inputs_[input_idx].next_timestamp < min_time) {
min_time = inputs_[input_idx].next_timestamp;
min_input = input_idx;
}
}
if (min_input < 0)
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
for (int input_idx : workload2inputs[workload_idx]) {
for (int input_idx : workloads_[workload_idx].inputs) {
VPRINT(this, 4,
"workload %d: setting input %d base_timestamp to %" PRIu64
" vs next_timestamp %zu\n",
Expand All @@ -113,7 +113,9 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
}
// Now assign round-robin to the outputs. We have to obey bindings here: we
// just take the first. This isn't guaranteed to be perfect if there are
// many bindings, but we run a rebalancing afterward.
// many bindings (or output limits), but we run a rebalancing afterward
// (to construct it up front would take similar code to the rebalance so we
// leverage that code).
output_ordinal_t output = 0;
while (!allq.empty()) {
input_info_t *input = allq.top();
Expand All @@ -137,9 +139,13 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
#endif
pop_from_ready_queue(i, i, queue_next);
assert(status == sched_type_t::STATUS_OK || status == sched_type_t::STATUS_IDLE);
if (queue_next == nullptr)
set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL);
else
if (queue_next == nullptr) {
// Try to steal, as the initial round-robin layout and rebalancing ignores
// output_limit and other factors.
status = eof_or_idle_for_mode(i, sched_type_t::INVALID_INPUT_ORDINAL);
if (status != sched_type_t::STATUS_STOLE)
set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL);
} else
set_cur_input(i, queue_next->index);
}
VPRINT(this, 2, "Initial queues:\n");
Expand All @@ -159,18 +165,21 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::swap_out_input(
if (input == sched_type_t::INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_OK;
bool at_eof = false;
int workload = -1;
{
std::lock_guard<mutex_dbg_owned> lock(*inputs_[input].lock);
at_eof = inputs_[input].at_eof;
assert(inputs_[input].cur_output == sched_type_t::INVALID_OUTPUT_ORDINAL);
workload = inputs_[input].workload;
}
// Now that the caller has updated the outgoing input's fields (we assert that
// cur_output was changed above), add it to the ready queue (once on the queue others
// can see it and pop it off).
if (!at_eof) {
add_to_ready_queue(output, &inputs_[input]);
}
// TODO i#7067: Track peak live core usage per workload here.
if (workloads_[workload].output_limit > 0)
workloads_[workload].live_output_count->fetch_add(-1, std::memory_order_release);
return sched_type_t::STATUS_OK;
}

Expand All @@ -179,7 +188,11 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::swap_in_input(output_ordinal_t output,
input_ordinal_t input)
{
// TODO i#7067: Track peak live core usage per workload here.
if (input == sched_type_t::INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_OK;
workload_info_t &workload = workloads_[inputs_[input].workload];
if (workload.output_limit > 0)
workload.live_output_count->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_OK;
}

Expand Down Expand Up @@ -915,6 +928,9 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::eof_or_idle_for_mode(
output_ordinal_t target = (output + i) % outputs_.size();
assert(target != output); // Sanity check (we won't reach "output").
input_info_t *queue_next = nullptr;
VPRINT(this, 4,
"eof_or_idle: output %d trying to steal from %d's ready_queue\n",
output, target);
stream_status_t status = pop_from_ready_queue(target, output, queue_next);
if (status == sched_type_t::STATUS_OK && queue_next != nullptr) {
set_cur_input(output, queue_next->index);
Expand Down Expand Up @@ -1091,11 +1107,20 @@ scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue_hold_lock
res->unscheduled = false;
VPRINT(this, 4, "pop queue: %d @ %" PRIu64 " no longer blocked\n",
res->index, cur_time);
// We've found a candidate. One final check if this is a migration.
bool found_candidate = false;
if (from_output == for_output)
// We've found a potential candidate. Is it under its output limit?
workload_info_t &workload = workloads_[res->workload];
if (workload.output_limit > 0 &&
workload.live_output_count->load(std::memory_order_acquire) >=
workload.output_limit) {
VPRINT(this, 2, "output[%d]: not running input %d: at output limit\n",
for_output, res->index);
++outputs_[from_output]
.stats[memtrace_stream_t::SCHED_STAT_HIT_OUTPUT_LIMIT];
} else if (from_output == for_output) {
found_candidate = true;
else {
} else {
// One final check if this is a migration.
assert(cur_time > 0 || res->last_run_time == 0);
if (res->last_run_time == 0) {
// For never-executed inputs we consider their last execution
Expand Down
3 changes: 1 addition & 2 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ namespace drmemtrace {

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule()
{
if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// Assign the inputs up front to avoid locks once we're in parallel mode.
Expand Down
14 changes: 10 additions & 4 deletions clients/drcachesim/scheduler/scheduler_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::~scheduler_impl_tmpl_t()
outputs_[i].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]);
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue rebalances",
outputs_[i].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]);
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Ouput limits hit",
outputs_[i].stats[memtrace_stream_t::SCHED_STAT_HIT_OUTPUT_LIMIT]);
#ifndef NDEBUG
VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue lock acquired",
outputs_[i].ready_queue.lock->get_count_acquired());
Expand Down Expand Up @@ -704,14 +706,14 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::init(
options_ = std::move(options);
verbosity_ = options_.verbosity;
// workload_inputs is not const so we can std::move readers out of it.
std::unordered_map<int, std::vector<int>> workload2inputs(workload_inputs.size());
for (int workload_idx = 0; workload_idx < static_cast<int>(workload_inputs.size());
++workload_idx) {
auto &workload = workload_inputs[workload_idx];
if (workload.struct_size != sizeof(input_workload_t))
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
if (!workload.only_threads.empty() && !workload.only_shards.empty())
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
std::vector<input_ordinal_t> inputs_in_workload;
input_reader_info_t reader_info;
reader_info.only_threads = workload.only_threads;
reader_info.only_shards = workload.only_shards;
Expand All @@ -735,7 +737,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::init(
input_info_t &input = inputs_.back();
input.index = index;
input.workload = workload_idx;
workload2inputs[workload_idx].push_back(index);
inputs_in_workload.push_back(index);
input.tid = reader.tid;
input.reader = std::move(reader.reader);
input.reader_end = std::move(reader.end);
Expand All @@ -751,10 +753,14 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::init(
return res;
for (const auto &it : reader_info.tid2input) {
inputs_[it.second].workload = workload_idx;
workload2inputs[workload_idx].push_back(it.second);
inputs_in_workload.push_back(it.second);
tid2input_[workload_tid_t(workload_idx, it.first)] = it.second;
}
}
int output_limit = 0;
if (workload.struct_size > offsetof(workload_info_t, output_limit))
output_limit = workload.output_limit;
workloads_.emplace_back(output_limit, std::move(inputs_in_workload));
if (!check_valid_input_limits(workload, reader_info))
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
if (!workload.times_of_interest.empty()) {
Expand Down Expand Up @@ -894,7 +900,7 @@ scheduler_impl_tmpl_t<RecordType, ReaderType>::init(
}
}

return set_initial_schedule(workload2inputs);
return set_initial_schedule();
}

template <typename RecordType, typename ReaderType>
Expand Down
Loading

0 comments on commit 337095e

Please sign in to comment.