Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
feat(core): add structured trace (#271)
Browse files Browse the repository at this point in the history
(See memo_dump.planner.sql for the example)

Compared with the standard log-based trace, structured trace puts all
information related to one group together, therefore helping the
developer understand what's going on in the system. The trace is
appended during apply_rule and optimize_inputs with a step and a stage
number, indicating the order of the operations.

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Jan 5, 2025
1 parent d1e27a5 commit c7f727f
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 51 deletions.
11 changes: 7 additions & 4 deletions optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub trait Memo<T: NodeType>: 'static + Send + Sync {
/// The group id is volatile, depending on whether the groups are merged.
fn get_group_id(&self, expr_id: ExprId) -> GroupId;

/// Reduce the group ID to the merged group ID.
fn reduce_group(&self, group_id: GroupId) -> GroupId;

/// Get the memoized representation of a node.
fn get_expr_memoed(&self, expr_id: ExprId) -> ArcMemoPlanNode<T>;

Expand Down Expand Up @@ -315,6 +318,10 @@ impl<T: NodeType> Memo<T> for NaiveMemo<T> {
fn estimated_plan_space(&self) -> usize {
self.expr_id_to_expr_node.len()
}

fn reduce_group(&self, group_id: GroupId) -> GroupId {
self.merged_group_mapping[&group_id]
}
}

impl<T: NodeType> NaiveMemo<T> {
Expand Down Expand Up @@ -396,10 +403,6 @@ impl<T: NodeType> NaiveMemo<T> {
}
}

fn reduce_group(&self, group_id: GroupId) -> GroupId {
self.merged_group_mapping[&group_id]
}

fn merge_group_inner(&mut self, merge_into: GroupId, merge_from: GroupId) {
if merge_into == merge_from {
return;
Expand Down
124 changes: 106 additions & 18 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::pin::Pin;
use std::sync::Arc;

use anyhow::Result;
use itertools::Itertools;
use tracing::trace;

use super::memo::{ArcMemoPlanNode, GroupInfo, Memo};
use super::memo::{ArcMemoPlanNode, GroupInfo, Memo, WinnerInfo};
use super::NaiveMemo;
use crate::cascades::memo::Winner;
use crate::cascades::tasks2::{TaskContext, TaskDesc};
Expand All @@ -38,16 +39,92 @@ pub struct OptimizerContext {

#[derive(Default, Clone, Debug)]
pub struct OptimizerProperties {
/// Panic the optimizer if the budget is reached, used in planner tests.
pub panic_on_budget: bool,
/// If the number of rules applied exceeds this number, we stop applying logical rules.
pub partial_explore_iter: Option<usize>,
/// Plan space can be expanded by this number of times before we stop applying logical rules.
pub partial_explore_space: Option<usize>,
/// Disable pruning during optimization.
pub disable_pruning: bool,
/// Enable tracing during optimization.
pub enable_tracing: bool,
}

#[derive(Debug, Default)]
#[derive(Clone)]
pub enum OptimizerTrace {
/// A winner decision is made
DecideWinner {
/// The stage and step number when a trace is recorded
stage: usize,
step: usize,
/// The group ID when a trace is recorded
group_id: GroupId,
/// The proposed winner
proposed_winner_info: WinnerInfo,
/// The winner of the children
children_winner: Vec<ExprId>,
},
/// The group is created by applying a rule
ApplyRule {
/// The step number when a trace is recorded
stage: usize,
step: usize,
/// The group ID when a trace is recorded
group_id: GroupId,
/// The expression being applied
applied_expr_id: ExprId,
/// The expression being produced
produced_expr_id: ExprId,
/// The rule ID
rule_id: usize,
},
}

impl OptimizerTrace {
pub fn stage_step(&self) -> (usize, usize) {
match self {
OptimizerTrace::DecideWinner { stage, step, .. } => (*stage, *step),
OptimizerTrace::ApplyRule { stage, step, .. } => (*stage, *step),
}
}
}

impl std::fmt::Display for OptimizerTrace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OptimizerTrace::DecideWinner {
stage,
step,
group_id,
proposed_winner_info,
children_winner,
} => {
write!(
f,
"step={}/{} decide_winner group_id={} proposed_winner_expr={} children_winner_exprs=[{}] total_weighted_cost={}",
stage, step, group_id, proposed_winner_info.expr_id, children_winner.iter().join(","), proposed_winner_info.total_weighted_cost
)
}
OptimizerTrace::ApplyRule {
stage,
step,
group_id,
applied_expr_id,
produced_expr_id,
rule_id,
} => {
write!(
f,
"step={}/{} apply_rule group_id={} applied_expr_id={} produced_expr_id={} rule_id={}",
stage, step, group_id, applied_expr_id, produced_expr_id, rule_id
)
}
}
}
}

#[derive(Default)]
pub struct CascadesStats {
pub rule_match_count: HashMap<usize, usize>,
pub rule_total_bindings: HashMap<usize, usize>,
Expand All @@ -56,6 +133,7 @@ pub struct CascadesStats {
pub optimize_expr_count: usize,
pub apply_rule_count: usize,
pub optimize_input_count: usize,
pub trace: HashMap<GroupId, Vec<OptimizerTrace>>,
}

pub struct CascadesOptimizer<T: NodeType, M: Memo<T> = NaiveMemo<T>> {
Expand All @@ -70,6 +148,7 @@ pub struct CascadesOptimizer<T: NodeType, M: Memo<T> = NaiveMemo<T>> {
logical_property_builders: Arc<[Box<dyn LogicalPropertyBuilderAny<T>>]>,
pub ctx: OptimizerContext,
pub prop: OptimizerProperties,
stage: usize,
}

/// `RelNode` only contains the representation of the plan nodes. Sometimes, we need more context,
Expand Down Expand Up @@ -137,6 +216,7 @@ impl<T: NodeType> CascadesOptimizer<T, NaiveMemo<T>> {
prop,
stats: CascadesStats::default(),
disabled_rules: HashSet::new(),
stage: 0,
}
}

Expand All @@ -163,14 +243,6 @@ impl<T: NodeType> CascadesOptimizer<T, NaiveMemo<T>> {
}

impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
pub fn panic_on_explore_limit(&mut self, enabled: bool) {
self.prop.panic_on_budget = enabled;
}

pub fn disable_pruning(&mut self, enabled: bool) {
self.prop.disable_pruning = enabled;
}

pub fn cost(&self) -> Arc<dyn CostModel<T, M>> {
self.cost.clone()
}
Expand Down Expand Up @@ -217,7 +289,7 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
self.disabled_rules.contains(&rule_id)
}

pub fn dump(&self) {
pub fn dump(&self, mut f: impl std::fmt::Write) -> std::fmt::Result {
for group_id in self.memo.get_all_group_ids() {
let winner_str = match &self.memo.get_group_info(group_id).winner {
Winner::Impossible => "winner=<impossible>".to_string(),
Expand All @@ -234,28 +306,41 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
)
}
};
println!("group_id={} {}", group_id, winner_str);
writeln!(f, "group_id={} {}", group_id, winner_str)?;
let group = self.memo.get_group(group_id);
for (id, property) in self.logical_property_builders.iter().enumerate() {
println!(
writeln!(
f,
" {}={}",
property.property_name(),
group.properties[id].as_ref()
)
)?;
}
let mut all_predicates = BTreeSet::new();
for expr_id in self.memo.get_all_exprs_in_group(group_id) {
let memo_node = self.memo.get_expr_memoed(expr_id);
for pred in &memo_node.predicates {
all_predicates.insert(*pred);
}
println!(" expr_id={} | {}", expr_id, memo_node);
writeln!(f, " expr_id={} | {}", expr_id, memo_node)?;
}
for pred in all_predicates {
println!(" {}={}", pred, self.memo.get_pred(pred));
writeln!(f, " {}={}", pred, self.memo.get_pred(pred))?;
}
let mut traces = Vec::new();
for (that_group_id, trace) in &self.stats.trace {
if self.memo.reduce_group(*that_group_id) == group_id {
traces.extend(trace.iter());
}
}
traces.sort_by_key(|x| x.stage_step());
for t in traces {
writeln!(f, " {}", t)?;
}
}
Ok(())
}

/// Optimize a `RelNode`.
pub fn step_optimize_rel(&mut self, root_rel: ArcPlanNode<T>) -> Result<GroupId> {
trace!(event = "step_optimize_rel", rel = %root_rel);
Expand Down Expand Up @@ -287,15 +372,18 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
}
});
if res.is_err() && cfg!(debug_assertions) {
self.dump();
let mut buf = String::new();
self.dump(&mut buf).unwrap();
eprintln!("{}", buf);
}
res
}

pub fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> {
use pollster::FutureExt as _;
trace!(event = "fire_optimize_tasks", root_group_id = %group_id);
let mut task = TaskContext::new(self);
self.stage += 1;
let mut task = TaskContext::new(self, self.stage);
// 32MB stack for the optimization process, TODO: reduce memory footprint
stacker::grow(32 * 1024 * 1024, || {
let fut: Pin<Box<dyn Future<Output = ()>>> = Box::pin(task.fire_optimize(group_id));
Expand Down
Loading

0 comments on commit c7f727f

Please sign in to comment.