Skip to content

Commit

Permalink
(Values/Filter)-push-down optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienDavat committed Jan 19, 2022
1 parent 72cde10 commit b5faae6
Show file tree
Hide file tree
Showing 22 changed files with 467 additions and 237 deletions.
2 changes: 1 addition & 1 deletion sage/cli/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def sage_query_debug(config_file, default_graph_uri, query, file, limit):
exit(1)

logical_plan = Parser.parse(query)
iterator, cardinalities = Optimizer.get_default().optimize(
iterator, cardinalities = Optimizer.get_default(dataset).optimize(
logical_plan, dataset, default_graph_uri
)
# iterator, cards = parse_query(query, dataset, default_graph_uri)
Expand Down
7 changes: 6 additions & 1 deletion sage/cli/explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def explain(
print(pprintAlgebra(tq))

logical_plan = Parser.parse(query)
iterator, cardinalities = Optimizer.get_default().optimize(
iterator, cardinalities = Optimizer.get_default(dataset).optimize(
logical_plan, dataset, graph_uri
)
# iterator, cards = parse_query(query, dataset, graph_uri)
Expand All @@ -132,6 +132,11 @@ def explain(
with open(output, 'w') as outfile:
outfile.write(QueryPlanStringifier().visit(iterator))

print("-----------------")
print("Optimized query")
print("-----------------")
print(QueryPlanStringifier().visit(iterator))

print("-----------------")
print("Cardinalities")
print("-----------------")
Expand Down
14 changes: 13 additions & 1 deletion sage/database/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def __init__(
public_url: Optional[str] = None,
default_query: Optional[str] = None,
analytics=None,
stateless=True
stateless=True,
filter_push_down=True,
values_push_down=True
):
super(Dataset, self).__init__()
self._name = name
Expand All @@ -33,6 +35,8 @@ def __init__(
self._default_query = default_query
self._analytics = analytics
self._stateless = stateless
self._filter_push_down = filter_push_down
self._values_push_down = values_push_down
self._force_order = False

@property
Expand All @@ -43,6 +47,14 @@ def name(self) -> str:
def is_stateless(self) -> bool:
return self._stateless

@property
def do_filter_push_down(self) -> bool:
return self._filter_push_down

@property
def do_values_push_down(self) -> bool:
return self._values_push_down

@property
def force_order(self) -> bool:
return self._force_order
Expand Down
23 changes: 14 additions & 9 deletions sage/database/core/yaml_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ def load_config(config_file: str) -> Dataset:
else:
stateless = True

# # if statefull, load the saved plan storage backend to use
# statefull_manager = None
# if not is_stateless:
# # TODO allow use of custom backend for saved plans
# # same kind of usage than custom DB backends
# statefull_manager = HashMapManager()

# get default time quantum & maximum number of results per page
if 'quota' in config:
if config['quota'] == 'inf':
Expand All @@ -71,7 +64,17 @@ def load_config(config_file: str) -> Dataset:
logging.warning("You are using SaGe without limitations on the number of results sent per page. This is fine, but be carefull as very large page of results can have unexpected serialization time.")
max_results = inf

# build all RDF graphs found in the configuration file
# load debug parameters
if 'filter_push_down' in config:
filter_push_down = config['filter_push_down']
else:
filter_push_down = True
if 'values_push_down' in config:
values_push_down = config['values_push_down']
else:
values_push_down = True

# build all RDF graphs found in the configuration file
graphs = dict()
if "graphs" not in config:
raise SyntaxError("Np RDF graphs found in the configuration file. Please refers to the documentation to see how to declare RDF graphs in a SaGe YAML configuration file.")
Expand Down Expand Up @@ -102,5 +105,7 @@ def load_config(config_file: str) -> Dataset:
public_url=public_url,
default_query=default_query,
analytics=analytics,
stateless=stateless
stateless=stateless,
filter_push_down=filter_push_down,
values_push_down=values_push_down
)
16 changes: 9 additions & 7 deletions sage/http_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from sage.query_engine.optimizer.parser import Parser
from sage.query_engine.optimizer.optimizer import Optimizer
from sage.query_engine.optimizer.physical.visitors.query_plan_stringifier import QueryPlanStringifier
# from sage.query_engine.optimizer.query_parser import parse_query
from sage.database.saved_plan.saved_plan_manager import SavedPlanManager
from sage.database.saved_plan.stateless_manager import StatelessManager
from sage.database.saved_plan.statefull_manager import StatefullManager
Expand Down Expand Up @@ -83,6 +82,8 @@ async def execute_query(
raise HTTPException(status_code=404, detail=f"RDF Graph {default_graph_uri} not found on the server.")
graph = dataset.get_graph(default_graph_uri)

optimizer = Optimizer.get_default(dataset)

# decode next_link or build query execution plan
cardinalities = dict()
loadin_start = time()
Expand All @@ -92,7 +93,7 @@ async def execute_query(
else:
start_timestamp = datetime.now()
logical_plan = Parser.parse(query)
plan, cardinalities = Optimizer.get_default().optimize(
plan, cardinalities = optimizer.optimize(
logical_plan, dataset, default_graph_uri, as_of=start_timestamp
)
# plan, cardinalities = parse_query(query, dataset, default_graph_uri)
Expand Down Expand Up @@ -131,8 +132,8 @@ async def execute_query(
"metrics": {
"progression": coverage_after,
"coverage": coverage_after - coverage_before,
"cost": Optimizer.get_default().cost(plan),
"cardinality": Optimizer.get_default().cardinality(plan)
"cost": optimizer.cost(plan),
"cardinality": optimizer.cardinality(plan)
}
}
print(stats['metrics'])
Expand All @@ -149,17 +150,18 @@ async def explain_query(
query: str, default_graph_uri: str, next_link: Optional[str],
dataset: Dataset
) -> str:
optimizer = Optimizer.get_default(dataset)
if next_link is not None:
plan = StatelessManager().get_plan(next_link, dataset)
else:
logical_plan = Parser.parse(query)
plan, cardinalities = Optimizer.get_default().optimize(
plan, cardinalities = optimizer.optimize(
logical_plan, dataset, default_graph_uri
)
return JSONResponse({
"query": QueryPlanStringifier().visit(plan),
"cost": Optimizer.get_default().cost(plan),
"cardinality": Optimizer.get_default().cardinality(plan)
"cost": optimizer.cost(plan),
"cardinality": optimizer.cardinality(plan)
})


Expand Down
10 changes: 7 additions & 3 deletions sage/query_engine/iterators/filter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# filter.py
# Author: Thomas MINIER - MIT License 2017-2020
from typing import Dict, Optional, Union, Set, Any
from typing import Dict, Optional, Union, Set, Any, List
from rdflib.term import Literal, URIRef, Variable
from rdflib.plugins.sparql.parserutils import Expr
from rdflib.plugins.sparql.sparql import Bindings, QueryContext
Expand All @@ -9,6 +9,7 @@
from sage.query_engine.iterators.preemptable_iterator import PreemptableIterator
from sage.query_engine.protobuf.iterators_pb2 import SavedFilterIterator
from sage.query_engine.protobuf.utils import pyDict_to_protoDict
from sage.query_engine.optimizer.logical.visitors.filter_variables_extractor import FilterVariablesExtractor


def to_rdflib_term(value: str) -> Union[Literal, URIRef, Variable]:
Expand Down Expand Up @@ -63,8 +64,11 @@ def explain(self, height: int = 0, step: int = 3) -> None:
print(f'{prefix}FilterIterator <{str(self._expression.vars)}>')
self._source.explain(height=(height + step), step=step)

def variables(self) -> Set[str]:
return self._source.variables()
def constrained_variables(self) -> List[str]:
return FilterVariablesExtractor().visit(self._expression)

def variables(self, include_values: bool = False) -> Set[str]:
return self._source.variables(include_values=include_values)

def __evaluate__(self, mappings: Dict[str, str]) -> bool:
"""Evaluate the FILTER expression with a set mappings.
Expand Down
6 changes: 4 additions & 2 deletions sage/query_engine/iterators/nlj.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ def explain(self, height: int = 0, step: int = 3) -> None:
self._left.explain(height=(height + step), step=step)
self._right.explain(height=(height + step), step=step)

def variables(self) -> Set[str]:
return self._left.variables().union(self._right.variables())
def variables(self, include_values: bool = False) -> Set[str]:
return self._left.variables(include_values=include_values).union(
self._right.variables(include_values=include_values)
)

def next_stage(self, mappings: Dict[str, str]):
"""Propagate mappings to the bottom of the pipeline in order to compute nested loop joins"""
Expand Down
2 changes: 1 addition & 1 deletion sage/query_engine/iterators/preemptable_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def explain(self, height: int = 0, step: int = 3) -> None:
pass

@abstractmethod
def variables(self) -> Set[str]:
def variables(self, include_values: bool = False) -> Set[str]:
"""Return the domain of the iterator"""
pass

Expand Down
7 changes: 5 additions & 2 deletions sage/query_engine/iterators/projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ def explain(self, height: int = 0, step: int = 3) -> None:
print(f'{prefix}ProjectionIterator SELECT {self._projection}')
self._source.explain(height=(height + step), step=step)

def variables(self) -> Set[str]:
return set(self._projection)
def variables(self, include_values: bool = False) -> Set[str]:
if self._projection is None:
return self._source.variables(include_values=include_values)
else:
return set(self._projection)

def next_stage(self, mappings: Dict[str, str]):
"""Propagate mappings to the bottom of the pipeline in order to compute nested loop joins"""
Expand Down
2 changes: 1 addition & 1 deletion sage/query_engine/iterators/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def explain(self, height: int = 0, step: int = 3) -> None:
object = self._pattern['object']
print(f'{prefix}ScanIterator <({subject} {predicate} {object})>')

def variables(self) -> Set[str]:
def variables(self, include_values: bool = False) -> Set[str]:
vars = set()
if self._pattern['subject'].startswith('?'):
vars.add(self._pattern['subject'])
Expand Down
6 changes: 4 additions & 2 deletions sage/query_engine/iterators/union.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ def explain(self, height: int = 0, step: int = 3) -> None:
self._left.explain(height=(height + step), step=step)
self._right.explain(height=(height + step), step=step)

def variables(self) -> Set[str]:
return self._left.variables().union(self._right.variables())
def variables(self, include_values: bool = False) -> Set[str]:
return self._left.variables(include_values=include_values).union(
self._right.variables(include_values=include_values)
)

def next_stage(self, mappings: Dict[str, str]):
"""Propagate mappings to the bottom of the pipeline in order to compute nested loop joins"""
Expand Down
9 changes: 4 additions & 5 deletions sage/query_engine/iterators/values.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __len__(self) -> int:
return len(self._values)

def __repr__(self) -> str:
return f"<ValuesIterator ({self._values})>"
return f"<ValuesIterator ({self.variables()})>"

def serialized_name(self):
"""Get the name of the iterator, as used in the plan serialization protocol"""
Expand All @@ -33,10 +33,10 @@ def explain(self, height: int = 0, step: int = 3) -> None:
if height > step:
prefix = ('|' + (' ' * (step - 1))) * (int(height / step) - 1)
prefix += ('|' + ('-' * (step - 1)))
print(f'{prefix}ValuesIterator <{self._values}>')
print(f'{prefix}ValuesIterator <{self.variables()}>')

def variables(self) -> Set[str]:
return set(self._values[0].keys())
def variables(self, include_values: bool = True) -> Set[str]:
return set(self._values[0].keys()) if include_values else set()

def next_stage(self, mappings: Dict[str, str]):
self._current_mappings = mappings
Expand All @@ -55,7 +55,6 @@ async def next(self, context: Dict[str, Any] = {}) -> Optional[Dict[str, str]]:
mappings = {**self._current_mappings, **mu}
else:
mappings = mu
print(mappings)
return mappings

def save(self) -> SavedValuesIterator:
Expand Down
3 changes: 2 additions & 1 deletion sage/query_engine/optimizer/logical/optimizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from sage.database.core.dataset import Dataset
from sage.query_engine.optimizer.logical.plan_visitor import LogicalPlanVisitor, Node
from sage.query_engine.optimizer.logical.visitors.filter_splitter import FilterSplitter

Expand All @@ -10,7 +11,7 @@ def __init__(self):
self._visitors = []

@staticmethod
def get_default() -> LogicalPlanOptimizer:
def get_default(dataset: Dataset) -> LogicalPlanOptimizer:
optimizer = LogicalPlanOptimizer()
optimizer.add_visitor(FilterSplitter())
return optimizer
Expand Down
5 changes: 5 additions & 0 deletions sage/query_engine/optimizer/logical/plan_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def visit_expression(self, node: Expr) -> Any:
return self.visit_conditional_or_expression(node)
elif node.name == 'RelationalExpression':
return self.visit_relational_expression(node)
elif node.name == 'AdditiveExpression':
return self.visit_additive_expression(node)
elif node.name == 'Builtin_REGEX':
return self.visit_regex_expression(node)
elif node.name == 'Builtin_NOTEXISTS':
Expand Down Expand Up @@ -136,6 +138,9 @@ def visit_conditional_or_expression(self, node: Expr) -> Any:
def visit_relational_expression(self, node: Expr) -> Any:
raise UnsupportedSPARQL(f'The {node.name} expressions are not implemented')

def visit_additive_expression(self, node: Expr) -> Any:
raise UnsupportedSPARQL(f'The {node.name} expressions are not implemented')

def visit_regex_expression(self, node: Expr) -> Any:
raise UnsupportedSPARQL(f'The {node.name} expressions are not implemented')

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Set
from rdflib.term import Variable
from rdflib.plugins.sparql.parserutils import Expr

from sage.query_engine.optimizer.logical.plan_visitor import LogicalPlanVisitor, RDFTerm


class FilterVariablesExtractor(LogicalPlanVisitor):

def visit_rdfterm(self, node: RDFTerm) -> Set[str]:
if isinstance(node, Variable):
return set([node.n3()])
else:
return set()

def visit_conditional_and_expression(self, node: Expr) -> Set[str]:
variables = self.visit(node.expr)
for other in node.other:
variables.update(self.visit(other))
return variables

def visit_conditional_or_expression(self, node: Expr) -> Set[str]:
variables = self.visit(node.expr)
for other in node.other:
variables.update(self.visit(other))
return variables

def visit_relational_expression(self, node: Expr) -> Set[str]:
return self.visit(node.expr)

def visit_additive_expression(self, node: Expr) -> Set[str]:
variables = self.visit(node.expr)
for other in node.other:
variables.update(self.visit(other))
return variables

def visit_regex_expression(self, node: Expr) -> Set[str]:
return self.visit(node.text)

def visit_not_exists_expression(self, node: Expr) -> Set[str]:
return self.visit(node.expr)

def visit_str_expression(self, node: Expr) -> Set[str]:
return self.visit(node.arg)

def visit_unary_not_expression(self, node: Expr) -> Set[str]:
return self.visit(node.expr)
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def visit_unary_not_expression(self, node: Expr) -> str:
def visit_str_expression(self, node: Expr) -> str:
return f'str({self.visit(node.arg)})'

def visit_additive_expression(self, node: Expr) -> str:
expression = self.visit(node.expr)
for index, operator in enumerate(node.op):
expression += f' {operator} {self.visit(node.other[index])}'
return f'({expression})'


class PipelineBuilder(LogicalPlanVisitor):

Expand Down
6 changes: 3 additions & 3 deletions sage/query_engine/optimizer/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def __init__(self):
self._physical_optimizer = None

@staticmethod
def get_default() -> Optimizer:
def get_default(dataset: Dataset) -> Optimizer:
optimizer = Optimizer()
optimizer.set_logical_optimizer(LogicalPlanOptimizer.get_default())
optimizer.set_physical_optimizer(PhysicalPlanOptimizer.get_default())
optimizer.set_logical_optimizer(LogicalPlanOptimizer.get_default(dataset))
optimizer.set_physical_optimizer(PhysicalPlanOptimizer.get_default(dataset))
return optimizer

def set_logical_optimizer(self, optimizer: LogicalPlanOptimizer) -> None:
Expand Down
Loading

0 comments on commit b5faae6

Please sign in to comment.