diff --git a/pyproject.toml b/pyproject.toml index ecddda0..fc2e25b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "cosl" -version = "0.0.48" +version = "0.0.49" authors = [ { name = "sed-i", email = "82407168+sed-i@users.noreply.github.com" }, ] diff --git a/src/cosl/rules.py b/src/cosl/rules.py index 820aac0..b12adc9 100644 --- a/src/cosl/rules.py +++ b/src/cosl/rules.py @@ -75,8 +75,8 @@ - `juju_application` """ # noqa: W505 +import hashlib import logging -import os import re from abc import ABC, abstractmethod from pathlib import Path @@ -86,11 +86,9 @@ from . import CosTool, JujuTopology from .types import ( - OfficialRuleFileFormat, OfficialRuleFileItem, QueryType, RuleType, - SingleRuleFormat, ) logger = logging.getLogger(__name__) @@ -149,7 +147,7 @@ def __init__(self, query_type: QueryType, topology: Optional[JujuTopology] = Non self.query_type = query_type self.topology = topology self.tool = CosTool(default_query_type=query_type) - self.groups = [] # type: List[Dict[str, Any]] + self.groups: List[OfficialRuleFileItem] = [] @property @abstractmethod @@ -160,7 +158,7 @@ def rule_type(self) -> RuleType: # --- HELPER METHODS FOR READING FILES, SHOULD BE STATIC --- # @staticmethod - def _is_official_rule_format(rules_dict: OfficialRuleFileFormat) -> bool: + def _is_official_rule_format(rules_dict: Dict[str, Any]) -> bool: """Are rules in the upstream format as supported by Prometheus or Loki. Rules in dictionary format are in "official" form if they @@ -176,7 +174,7 @@ def _is_official_rule_format(rules_dict: OfficialRuleFileFormat) -> bool: return "groups" in rules_dict @staticmethod - def _is_single_rule_format(rules_dict: SingleRuleFormat, rule_type: RuleType) -> bool: + def _is_single_rule_format(rules_dict: Dict[str, Any], rule_type: RuleType) -> bool: """Are alert rules in single rule format. This library supports reading of rules in a custom format that @@ -212,7 +210,7 @@ def _multi_suffix_glob( all_files_in_dir = dir_path.glob("**/*" if recursive else "*") return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir)) - def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]: + def _from_dir(self, dir_path: Path, recursive: bool) -> List[OfficialRuleFileItem]: """Read all rule files in a directory. All rules from files for the same directory are loaded into a single @@ -227,7 +225,7 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]: a list of dictionaries representing prometheus rule groups, each dictionary representing a group (structure determined by `yaml.safe_load`). """ - groups = [] # type: List[Dict[str, Any]] + groups: List[OfficialRuleFileItem] = [] # Gather all records into a list of groups for file_path in Rules._multi_suffix_glob( @@ -243,7 +241,7 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]: def _from_file( # noqa: C901 self, root_path: Path, file_path: Path ) -> List[OfficialRuleFileItem]: - """Read a rules file from path, injecting juju topology. + """Read a rules file from path. Args: root_path: full path to the root rules folder (used only for generating group name) @@ -262,82 +260,96 @@ def _from_file( # noqa: C901 logger.error("Failed to read rules from %s: %s", file_path.name, e) return [] - if not rule_file: - logger.warning("Empty rules file: %s", file_path.name) - return [] - if not isinstance(rule_file, dict): - logger.error("Invalid rules file (must be a dict): %s", file_path.name) - return [] + # Generate group name prefix + # - name, from juju topology + # - suffix, from the relative path of the rule file; + rel_path = file_path.parent.relative_to(root_path) + rel_path = "" if rel_path == Path(".") else str(rel_path) + group_name_parts = [self.topology.identifier] if self.topology else [] + group_name_parts.append(rel_path) + group_name_prefix = "_".join(filter(None, group_name_parts)) - if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_file)): - rule_file = cast(OfficialRuleFileFormat, rule_file) - groups = rule_file["groups"] - elif self._is_single_rule_format(cast(SingleRuleFormat, rule_file), self.rule_type): - # convert to list of groups - # group name is made up from the file name - rule_file = cast(SingleRuleFormat, rule_file) - groups = [{"name": file_path.stem, "rules": [rule_file]}] - else: - # invalid/unsupported - logger.error("Invalid rules file: %s", file_path.name) + try: + groups = self._from_dict( + rule_file, group_name=file_path.stem, group_name_prefix=group_name_prefix + ) + except ValueError as e: + logger.error("Invalid rules file: %s (%s)", file_path.name, e) return [] - # update rules with additional metadata - groups = cast(List[OfficialRuleFileItem], groups) - for group in groups: - if not self._is_already_modified(group["name"]): - # update group name with topology and sub-path - group["name"] = self._group_name(str(root_path), str(file_path), group["name"]) - - # add "juju_" topology labels - for rule in group["rules"]: - if "labels" not in rule: - rule["labels"] = {} - - if self.topology: - # only insert labels that do not already exist - for label, val in self.topology.label_matcher_dict.items(): - if label not in rule["labels"]: - rule["labels"][label] = val - - # insert juju topology filters into a prometheus rule - repl = r'job=~".+"' if self.query_type == "logql" else "" - rule["expr"] = self.tool.inject_label_matchers( # type: ignore - expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]), - topology={ - k: rule["labels"][k] - for k in ("juju_model", "juju_model_uuid", "juju_application") - if rule["labels"].get(k) is not None - }, - query_type=self.query_type, - ) - return groups - def _group_name(self, root_path: str, file_path: str, group_name: str) -> str: - """Generate group name from path and topology. - - The group name is made up of the relative path between the root dir_path, the file path, - and topology identifier. + def _from_dict( + self, + rule_dict: Dict[str, Any], + *, + group_name: Optional[str] = None, + group_name_prefix: Optional[str] = None, + ) -> List[OfficialRuleFileItem]: + """Process rules from dict, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness. Args: - root_path: path to the root rules dir. - file_path: path to rule file. - group_name: original group name to keep as part of the new augmented group name + rule_dict: rules content in single-rule or official-rule format as a YAML dict + group_name: a custom identifier for the rule name to include in the group name + group_name_prefix: a custom group identifier to prefix the resulting group name, likely Juju topology and relative path context - Returns: - New group name, augmented by juju topology and relative path. + Raises: + ValueError, when invalid rule format given. """ - rel_path = os.path.relpath(os.path.dirname(file_path), root_path) - rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_") + if not rule_dict: + raise ValueError("Empty") + + if self._is_official_rule_format(rule_dict): + groups = rule_dict["groups"] + elif self._is_single_rule_format(rule_dict, self.rule_type): + if not group_name: + # Note: the caller of this function should ensure this never happens: + # Either we use the standard format, or we'd pass a group_name. + # If/when we drop support for the single-rule-per-file format, this won't + # be needed anymore. + group_name = hashlib.shake_256(str(rule_dict).encode("utf-8")).hexdigest(10) + + # convert to list of groups to match official rule format + groups = [{"name": group_name, "rules": [rule_dict]}] + else: + # invalid/unsupported + raise ValueError("Invalid rule format") + + # update rules with additional metadata + groups = cast(List[OfficialRuleFileItem], groups) + for group in groups: + if not self._is_already_modified(group["name"]): + # update group name with topology and sub-path + group["name"] = "_".join( + filter(None, [group_name_prefix, group["name"], f"{self.rule_type}s"]) + ) + # after sanitizing we should not modify group["name"] anymore + group["name"] = self._sanitize_metric_name(group["name"]) + + # add "juju_" topology labels + for rule in group["rules"]: + if "labels" not in rule: + rule["labels"] = {} + + if self.topology: + # only insert labels that do not already exist + for label, val in self.topology.label_matcher_dict.items(): + if label not in rule["labels"]: + rule["labels"][label] = val + + # insert juju topology filters into a prometheus rule + repl = r'job=~".+"' if self.query_type == "logql" else "" + rule["expr"] = self.tool.inject_label_matchers( # type: ignore + expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]), + topology={ + k: rule["labels"][k] + for k in ("juju_model", "juju_model_uuid", "juju_application") + if rule["labels"].get(k) is not None + }, + query_type=self.query_type, + ) - # Generate group name: - # - name, from juju topology - # - suffix, from the relative path of the rule file; - group_name_parts = [self.topology.identifier] if self.topology else [] - group_name_parts.extend([rel_path, group_name, f"{self.rule_type}s"]) - # filter to remove empty strings - return "_".join(filter(None, group_name_parts)) + return groups def _is_already_modified(self, name: str) -> bool: """Detect whether a group name has already been modified with juju topology.""" @@ -346,8 +358,29 @@ def _is_already_modified(self, name: str) -> bool: return False return True + def _sanitize_metric_name(self, metric_name: str) -> str: + """Sanitize a metric name according to https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels.""" + return "".join(char if re.match(r"[a-zA-Z0-9_:]", char) else "_" for char in metric_name) + # ---- END STATIC HELPER METHODS --- # + def add( + self, + rule_dict: Dict[str, Any], + group_name: Optional[str] = None, + group_name_prefix: Optional[str] = None, + ) -> None: + """Add rules from dict to the existing ruleset. + + Args: + rule_dict: a single-rule or official-rule YAML dict + group_name: a custom group name, used only if the new rule is of single-rule format + group_name_prefix: a custom group name prefix, used only if the new rule is of single-rule format + """ + self.groups.extend( + self._from_dict(rule_dict, group_name=group_name, group_name_prefix=group_name_prefix) + ) + def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None: """Add rules from a dir path. @@ -357,9 +390,6 @@ def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> No Args: dir_path: either a rules file or a dir of rules files. recursive: whether to read files recursively or not (no impact if `path` is a file). - - Returns: - True if path was added else False. """ path = Path(dir_path) if isinstance(dir_path, str) else dir_path if path.is_dir(): diff --git a/tests/test_rules_promql.py b/tests/test_rules_promql.py index afef8b8..38662e4 100644 --- a/tests/test_rules_promql.py +++ b/tests/test_rules_promql.py @@ -40,6 +40,7 @@ def test_each_alert_rule_is_topology_labeled(self): self.assertIn("juju_application", labels) self.assertIn("juju_model_uuid", labels) # alerts should not have unit information if not already present + # https://discourse.charmhub.io/t/juju-topology-labels/8874 self.assertNotIn("juju_unit", rule["labels"]) self.assertNotIn("juju_unit=", rule["expr"]) else: @@ -76,6 +77,186 @@ def test_each_alert_expression_is_topology_labeled(self): self.assertIn("juju_application", labels) +class TestAddRuleFromStr(unittest.TestCase): + def setUp(self): + self.rule = { + "alert": "SomeRuleName", + "expr": "up < 1", + "labels": {"severity": "critical"}, + } + + def test_official_rule_add_alerts_from_dict(self): + official_rule = { + "groups": [ + { + "name": "SomeGroupName", + "rules": [ + { + "alert": "FooRule", + "expr": "up < 1", + "labels": {"severity": "critical"}, + }, + { + "alert": "BarRule", + "expr": "absent(up)", + "labels": {"severity": "critical"}, + }, + ], + } + ] + } + expected_rules = { + "groups": [ + { + "name": "initial_alerts", + "rules": [self.rule], + }, + {"name": "SomeGroupName_alerts", "rules": official_rule["groups"][0]["rules"]}, + ] + } + # GIVEN an alert rule + rules = AlertRules(query_type="promql") + rules.groups = rules._from_dict(self.rule, group_name="initial") + # WHEN a rule is added from string in official-rule format + rules.add(official_rule) + rules_dict = rules.as_dict() + # THEN the new rule is a combination of all + self.assertEqual({}, DeepDiff(expected_rules, rules_dict)) + + def test_single_rule_add_alerts_from_dict(self): + single_rule = { + "alert": "BarRule", + "expr": "up < 1", + "labels": {"severity": "critical"}, + } + expected_rules = { + "groups": [ + { + "name": "initial_alerts", + "rules": [self.rule], + }, + {"name": "some_new_alerts", "rules": [single_rule]}, + ] + } + # GIVEN an alert rule + rules = AlertRules(query_type="promql") + rules.groups = rules._from_dict(self.rule, group_name="initial") + # WHEN a rule is added from string in single-rule format with a custom group name and prefix + rules.add( + single_rule, + group_name="new", + group_name_prefix="some", + ) + # THEN the new rule is a combination of all + rules_dict = rules.as_dict() + # AND the added rule group name has the custom group name and prefix + self.assertEqual({}, DeepDiff(expected_rules, rules_dict)) + + +class TestFromDictGroupName(unittest.TestCase): + def setUp(self): + self.official_rule = { + "groups": [ + { + "name": "SomeGroupName", + "rules": [ + { + "alert": "SomeRuleName", + "expr": "up < 1", + "labels": {"severity": "critical"}, + } + ], + } + ] + } + self.single_rule = { + "alert": "SomeRuleName", + "expr": "up < 1", + "labels": {"severity": "critical"}, + } + + def test_single_rule_from_dict_group_sanitized(self): + # GIVEN an alert rule in single-rule format + rules = AlertRules(query_type="promql") + # WHEN processed as string and provided an illegal custom group name + groups = rules._from_dict( + self.single_rule, group_name="Foo$123/Hello:World(go_od)bye!@#^&*()[]{}|;:,.<>?`~_" + ) + for group in groups: + # THEN the group name only contains characters in [a-zA-Z0-9_:] + # AND special characters are replaced with "_" + self.assertEqual( + group["name"], "Foo_123_Hello:World_go_od_bye______________:_________alerts" + ) + + def test_single_rule_from_dict(self): + # GIVEN an alert rule in single-rule format + rules = AlertRules(query_type="promql") + # WHEN processed as string + groups = rules._from_dict(self.single_rule) + for group in groups: + # THEN group name contains hash + self.assertNotEqual(get_hash(group["name"]), "") + + def test_single_rule_from_dict_custom_group(self): + # GIVEN an alert rule in single-rule format + rules = AlertRules(query_type="promql") + # WHEN processed as string and provided custom group name + groups = rules._from_dict(self.single_rule, group_name="foo") + for group in groups: + # THEN group name does not contain hash + self.assertEqual(get_hash(group["name"]), "") + # AND group name contains the custom group name + self.assertIn("foo", group["name"]) + + def test_single_rule_from_dict_custom_group_prefix(self): + # GIVEN an alert rule in single-rule format + rules = AlertRules(query_type="promql") + # WHEN processed as string and provided custom group name prefix + groups = rules._from_dict(self.single_rule, group_name_prefix="foo") + for group in groups: + # THEN group name does not include the original group name + self.assertNotIn("SomeGroupName", group["name"]) + # AND group name is prefixed with custom value + self.assertTrue(group["name"].startswith("foo")) + + def test_official_rule_from_dict(self): + # GIVEN an alert rule in official-rule format + rules = AlertRules(query_type="promql") + # WHEN processed as dict + groups = rules._from_dict(self.official_rule) + for group in groups: + # THEN group name matches the group name in the alert + self.assertIn("SomeGroupName", group["name"]) + + def test_official_rule_from_dict_custom_group_prefix(self): + # GIVEN an alert rule in official-rule format + rules = AlertRules(query_type="promql") + # WHEN processed as string and provided custom group name prefix + groups = rules._from_dict(self.official_rule, group_name_prefix="foo") + for group in groups: + # THEN group name includes the original group name + self.assertIn("SomeGroupName", group["name"]) + # AND group name is prefixed with custom value + self.assertTrue(group["name"].startswith("foo")) + + def test_raises_value_error_empty_dict(self): + rules = AlertRules(query_type="promql") + with self.assertRaises(ValueError) as ctx: + rules._from_dict("") + self.assertEqual(str(ctx.exception), "Empty") + + def test_raises_value_error_invalid_rule_format(self): + invalid_rule = { + "expr": "up < 1", + "labels": {"severity": "critical"}, + } + rules = AlertRules(query_type="promql") + with self.assertRaises(ValueError) as ctx: + rules._from_dict(invalid_rule) + self.assertEqual(str(ctx.exception), "Invalid rule format") + + def sorted_matchers(matchers) -> str: parts = [m.strip() for m in matchers.split(",")] return ",".join(sorted(parts)) @@ -99,6 +280,27 @@ def expression_labels(expr): yield labels +def get_hash(group_name: str) -> str: + """Extract the hash of the group name when a group name was not provided to _from_dict method in the Rules class. + + This occurs when the single-rule format is provided rather than official-rule format. + + Args: + group_name: a string representing the group name of the rules. + + Returns: + the matching hexdigest of the group name if a match is found, otherwise an empty string. + """ + import re + + pattern = r"([a-f0-9]{20})_" + match = re.search(pattern, group_name) + if match: + return match.group(1) + else: + return "" + + class TestAlertRulesWithOneRulePerFile(unittest.TestCase): def setUp(self) -> None: free_standing_rule = {