diff --git a/features/backup_create.py b/features/backup_create.py index 65ce52a61..8e8ef1f85 100755 --- a/features/backup_create.py +++ b/features/backup_create.py @@ -3,12 +3,18 @@ import subprocess import sys +from time import sleep + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--datadir", required=True) parser.add_argument("--dbname", required=True) parser.add_argument("--walmethod", required=True, choices=("fetch", "stream", "none")) + parser.add_argument("--sleep", required=False, type=int) args, _ = parser.parse_known_args() + if args.sleep: + sleep(args.sleep) + walmethod = ["-X", args.walmethod] if args.walmethod != "none" else [] sys.exit(subprocess.call(["pg_basebackup", "-D", args.datadir, "-c", "fast", "-d", args.dbname] + walmethod)) diff --git a/features/backup_restore.py b/features/backup_restore.py index 9b926b54f..36a236897 100755 --- a/features/backup_restore.py +++ b/features/backup_restore.py @@ -2,11 +2,17 @@ import argparse import shutil +from time import sleep + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--datadir", required=True) parser.add_argument("--sourcedir", required=True) parser.add_argument("--test-argument", required=True) + parser.add_argument("--sleep", required=False, type=int) args, _ = parser.parse_known_args() + if args.sleep: + sleep(args.sleep) + shutil.copytree(args.sourcedir, args.datadir) diff --git a/features/bootstrap_labels.feature b/features/bootstrap_labels.feature new file mode 100644 index 000000000..26f2c763e --- /dev/null +++ b/features/bootstrap_labels.feature @@ -0,0 +1,22 @@ +Feature: bootstrap labels + Check that user-configurable bootstrap labels are set and removed with state change + +Scenario: check label for cluster bootstrap protection + When I start postgres-0 + Then postgres-0 is a leader after 10 seconds + When I start postgres-1 in a cluster batman1 as a long-running clone of postgres-0 + Then "members/postgres-1" key in DCS has state=running custom bootstrap script after 20 seconds + And postgres-1 is labeled with "foo" + And postgres-1 is a leader of batman1 after 20 seconds + +Scenario: check label for replica bootstrap protection + When I do a backup of postgres-1 + And I start postgres-2 in cluster batman1 using long-running backup_restore + Then "members/postgres-2" key in DCS has state=creating replica after 20 seconds + And postgres-2 is labeled with "foo" + +Scenario: check label is removed + Given "members/postgres-1" key in DCS has state=running after 2 seconds + And "members/postgres-2" key in DCS has state=running after 20 seconds + Then postgres-1 is not labeled with "foo" + And postgres-2 is not labeled with "foo" diff --git a/features/environment.py b/features/environment.py index 9f9adfd2a..ccf8d69f9 100644 --- a/features/environment.py +++ b/features/environment.py @@ -54,6 +54,8 @@ def start(self, max_wait_limit=5): self._log = open(os.path.join(self._output_dir, self._name + '.log'), 'a') self._handle = self._start() + if max_wait_limit < 0: + return max_wait_limit *= self._context.timeout_multiplier for _ in range(max_wait_limit): assert self._has_started(), "Process {0} is not running after being started".format(self._name) @@ -218,6 +220,8 @@ def _make_patroni_test_config(self, name, custom_config): 'host replication replicator all md5', 'host all all all md5' ] + if isinstance(self._context.dcs_ctl, KubernetesController): + config['kubernetes'] = {'bootstrap_labels': {'foo': 'bar'}} if self._context.postgres_supports_ssl and self._context.certfile: config['postgresql']['parameters'].update({ @@ -657,6 +661,11 @@ def delete_pod(self, name): except Exception: break + def pod_labels(self, name): + pod = self._api.read_namespaced_pod(name, self._namespace) + print(pod.metadata.labels) + return pod.metadata.labels or {} + def query(self, key, scope='batman', group=None): if key.startswith('members/'): pod = self._api.read_namespaced_pod(key[8:], self._namespace) @@ -870,15 +879,17 @@ def create_and_set_output_directory(self, feature_name): os.makedirs(feature_dir) self._output_dir = feature_dir - def clone(self, from_name, cluster_name, to_name): + def clone(self, from_name, cluster_name, to_name, long_running=False): f = self._processes[from_name] + max_wait_limit = -1 if long_running else 10 + custom_config = { 'scope': cluster_name, 'bootstrap': { 'method': 'pg_basebackup', 'pg_basebackup': { 'command': " ".join(self.BACKUP_SCRIPT - + ['--walmethod=stream', '--dbname="{0}"'.format(f.backup_source)]) + + ['--walmethod=stream', f'--dbname="{f.backup_source}"', f'--sleep {5 if long_running else 0}']) }, 'dcs': { 'postgresql': { @@ -901,12 +912,13 @@ def clone(self, from_name, cluster_name, to_name): } } } - self.start(to_name, custom_config=custom_config) + self.start(to_name, custom_config=custom_config, max_wait_limit=max_wait_limit) - def backup_restore_config(self, params=None): + def backup_restore_config(self, params=None, long_running=False): return { 'command': (self.BACKUP_RESTORE_SCRIPT - + ' --sourcedir=' + os.path.join(self.patroni_path, 'data', 'basebackup')).replace('\\', '/'), + + ' --sourcedir=' + os.path.join(self.patroni_path, 'data', 'basebackup') + + f' --sleep {5 if long_running else 0}').replace('\\', '/'), 'test-argument': 'test-value', # test config mapping approach on custom bootstrap/replica creation **(params or {}), } @@ -1162,3 +1174,5 @@ def before_scenario(context, scenario): scenario.skip('it is not possible to control state of {0} from tests'.format(context.dcs_ctl.name())) if 'reject-duplicate-name' in scenario.effective_tags and context.dcs_ctl.name() == 'raft': scenario.skip('Flaky test with Raft') + if scenario.filename.endswith('bootstrap_labels.feature') and not isinstance(context.dcs_ctl, KubernetesController): + scenario.skip() diff --git a/features/steps/bootstrap_labels.py b/features/steps/bootstrap_labels.py new file mode 100644 index 000000000..fb2cb782e --- /dev/null +++ b/features/steps/bootstrap_labels.py @@ -0,0 +1,29 @@ +import time + +from behave import step, then + +@step('I start {name:name} in a cluster {cluster_name:w} as a long-running clone of {name2:name}') +def start_cluster_clone(context, name, cluster_name, name2): + context.pctl.clone(name2, cluster_name, name, True) + +@step('I start {name:name} in cluster {cluster_name:w} using long-running backup_restore') +def start_patroni(context, name, cluster_name): + return context.pctl.start(name, custom_config={ + "scope": cluster_name, + "postgresql": { + 'create_replica_methods': ['backup_restore'], + "backup_restore": context.pctl.backup_restore_config(long_running=True), + 'authentication': { + 'superuser': {'password': 'patroni1'}, + 'replication': {'password': 'rep-pass1'} + } + } + }, max_wait_limit=-1) + +@then('{name:name} is labeled with "{label:w}"') +def pod_labeled(context, name, label): + assert label in context.dcs_ctl.pod_labels(name), f'pod {name} is not labeled with {label}' + +@then('{name:name} is not labeled with "{label:w}"') +def pod_not_labeled(context, name, label): + assert label not in context.dcs_ctl.pod_labels(name), f'pod {name} is still labeled with {label}' diff --git a/patroni/config.py b/patroni/config.py index 05ed4ad55..cf6f3f2e7 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -657,7 +657,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2]) 'SERVICE_TAGS', 'NAMESPACE', 'CONTEXT', 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS', 'BYPASS_API_SERVICE', 'RETRIABLE_HTTP_CODES', 'KEY_PASSWORD', 'USE_SSL', 'SET_ACLS', 'GROUP', 'DATABASE', 'LEADER_LABEL_VALUE', 'FOLLOWER_LABEL_VALUE', - 'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA') and name: + 'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA', 'BOOTSTRAP_LABELS') and name: value = os.environ.pop(param) if name == 'CITUS': if suffix == 'GROUP': @@ -668,7 +668,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2]) value = value and parse_int(value) elif suffix in ('HOSTS', 'PORTS', 'CHECKS', 'SERVICE_TAGS', 'RETRIABLE_HTTP_CODES'): value = value and _parse_list(value) - elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA'): + elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA', 'BOOTSTRAP_LABELS'): value = _parse_dict(value) elif suffix in ('USE_PROXIES', 'REGISTER_SERVICE', 'USE_ENDPOINTS', 'BYPASS_API_SERVICE', 'VERIFY'): value = parse_bool(value) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 4b9cedefd..7320a708a 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -760,6 +760,7 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self._follower_label_value = config.get('follower_label_value', 'replica') self._standby_leader_label_value = config.get('standby_leader_label_value', 'primary') self._tmp_role_label = config.get('tmp_role_label') + self._bootstrap_labels: Dict[str, str] = {str(k): str(v) for k, v in (config.get('bootstrap_labels') or EMPTY_DICT).items()} self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp) if self._mpp.is_enabled(): @@ -1336,8 +1337,13 @@ def touch_member(self, data: Dict[str, Any]) -> bool: and deep_compare(data, member.data) if not ret: - metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, - 'annotations': {'status': json.dumps(data, separators=(',', ':'))}} + metadata: Dict[str, Any] = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels, + 'annotations': {'status': json.dumps(data, separators=(',', ':'))}} + if self._bootstrap_labels: + if data['state'] in ('initializing new cluster', 'running custom bootstrap script', 'creating replica'): + metadata['labels'].update(self._bootstrap_labels) + else: + metadata['labels'].update({k: None for k, _ in self._bootstrap_labels.items()}) body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata)) ret = self._api.patch_namespaced_pod(self._name, self._namespace, body) if ret: diff --git a/patroni/validator.py b/patroni/validator.py index cb7b64522..05d859e61 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1135,6 +1135,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}], Optional("cacert"): str, Optional("retriable_http_codes"): Or(int, [int]), + Optional("bootstrap_labels"): dict, }, }), Optional("citus"): { diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index dc4a13653..7e0c9cd4b 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -235,7 +235,8 @@ class BaseTestKubernetes(unittest.TestCase): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True) def setUp(self, config=None): config = {'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10, - 'kubernetes': {'labels': {'f': 'b'}, 'bypass_api_service': True, **(config or {})}, + 'kubernetes': {'labels': {'f': 'b'}, 'bypass_api_service': True, **(config or {}), + 'bootstrap_labels': {'foo': 'bar'}}, 'citus': {'group': 0, 'database': 'postgres'}} self.k = get_dcs(config) self.assertIsInstance(self.k, Kubernetes) @@ -317,31 +318,43 @@ def test_set_config_value(self): @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', create=True) def test_touch_member(self, mock_patch_namespaced_pod): mock_patch_namespaced_pod.return_value.metadata.resource_version = '10' - self.k.touch_member({'role': 'replica'}) + self.k._name = 'p-1' + self.k.touch_member({'role': 'replica', 'state': 'initializing new cluster'}) + self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') + self.k.touch_member({'state': 'running', 'role': 'replica'}) + self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None) + + self.k.touch_member({'role': 'replica', 'state': 'running custom bootstrap script'}) + self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') + self.k.touch_member({'state': 'stopped', 'role': 'primary'}) + self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None) self.k._role_label = 'isMaster' self.k._leader_label_value = 'true' self.k._follower_label_value = 'false' self.k._standby_leader_label_value = 'false' self.k._tmp_role_label = 'tmp_role' + + self.k.touch_member({'state': 'creating replica', 'role': 'replica'}) + self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') self.k.touch_member({'state': 'running', 'role': 'replica'}) - mock_patch_namespaced_pod.assert_called() + self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false') self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'replica') mock_patch_namespaced_pod.rest_mock() self.k._name = 'p-0' - self.k.touch_member({'role': 'standby_leader'}) + self.k.touch_member({'state': 'running', 'role': 'standby_leader'}) mock_patch_namespaced_pod.assert_called() self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false') self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary') mock_patch_namespaced_pod.rest_mock() - self.k.touch_member({'role': 'primary'}) + self.k.touch_member({'state': 'running', 'role': 'primary'}) mock_patch_namespaced_pod.assert_called() self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'true') self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary') diff --git a/tests/test_validator.py b/tests/test_validator.py index 193aae108..8ee18815f 100644 --- a/tests/test_validator.py +++ b/tests/test_validator.py @@ -70,6 +70,7 @@ "kubernetes": { "namespace": "string", "labels": {}, + 'bootstrap_labels': {'foo': 'bar'}, "scope_label": "string", "role_label": "string", "use_endpoints": False,