Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (feature/bootstrap-labels) #486

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions features/backup_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import subprocess
import sys

from time import sleep

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--datadir", required=True)
parser.add_argument("--dbname", required=True)
parser.add_argument("--walmethod", required=True, choices=("fetch", "stream", "none"))
parser.add_argument("--sleep", required=False, type=int)
args, _ = parser.parse_known_args()

if args.sleep:
sleep(args.sleep)

walmethod = ["-X", args.walmethod] if args.walmethod != "none" else []
sys.exit(subprocess.call(["pg_basebackup", "-D", args.datadir, "-c", "fast", "-d", args.dbname] + walmethod))
6 changes: 6 additions & 0 deletions features/backup_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
import argparse
import shutil

from time import sleep

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--datadir", required=True)
parser.add_argument("--sourcedir", required=True)
parser.add_argument("--test-argument", required=True)
parser.add_argument("--sleep", required=False, type=int)
args, _ = parser.parse_known_args()

if args.sleep:
sleep(args.sleep)

shutil.copytree(args.sourcedir, args.datadir)
22 changes: 22 additions & 0 deletions features/bootstrap_labels.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Feature: bootstrap labels
Check that user-configurable bootstrap labels are set and removed with state change

Scenario: check label for cluster bootstrap protection
When I start postgres-0
Then postgres-0 is a leader after 10 seconds
When I start postgres-1 in a cluster batman1 as a long-running clone of postgres-0
Then "members/postgres-1" key in DCS has state=running custom bootstrap script after 20 seconds
And postgres-1 is labeled with "foo"
And postgres-1 is a leader of batman1 after 20 seconds

Scenario: check label for replica bootstrap protection
When I do a backup of postgres-1
And I start postgres-2 in cluster batman1 using long-running backup_restore
Then "members/postgres-2" key in DCS has state=creating replica after 20 seconds
And postgres-2 is labeled with "foo"

Scenario: check label is removed
Given "members/postgres-1" key in DCS has state=running after 2 seconds
And "members/postgres-2" key in DCS has state=running after 20 seconds
Then postgres-1 is not labeled with "foo"
And postgres-2 is not labeled with "foo"
24 changes: 19 additions & 5 deletions features/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def start(self, max_wait_limit=5):
self._log = open(os.path.join(self._output_dir, self._name + '.log'), 'a')
self._handle = self._start()

if max_wait_limit < 0:
return
max_wait_limit *= self._context.timeout_multiplier
for _ in range(max_wait_limit):
assert self._has_started(), "Process {0} is not running after being started".format(self._name)
Expand Down Expand Up @@ -218,6 +220,8 @@ def _make_patroni_test_config(self, name, custom_config):
'host replication replicator all md5',
'host all all all md5'
]
if isinstance(self._context.dcs_ctl, KubernetesController):
config['kubernetes'] = {'bootstrap_labels': {'foo': 'bar'}}

if self._context.postgres_supports_ssl and self._context.certfile:
config['postgresql']['parameters'].update({
Expand Down Expand Up @@ -657,6 +661,11 @@ def delete_pod(self, name):
except Exception:
break

def pod_labels(self, name):
pod = self._api.read_namespaced_pod(name, self._namespace)
print(pod.metadata.labels)
return pod.metadata.labels or {}

def query(self, key, scope='batman', group=None):
if key.startswith('members/'):
pod = self._api.read_namespaced_pod(key[8:], self._namespace)
Expand Down Expand Up @@ -870,15 +879,17 @@ def create_and_set_output_directory(self, feature_name):
os.makedirs(feature_dir)
self._output_dir = feature_dir

def clone(self, from_name, cluster_name, to_name):
def clone(self, from_name, cluster_name, to_name, long_running=False):
f = self._processes[from_name]
max_wait_limit = -1 if long_running else 10

custom_config = {
'scope': cluster_name,
'bootstrap': {
'method': 'pg_basebackup',
'pg_basebackup': {
'command': " ".join(self.BACKUP_SCRIPT
+ ['--walmethod=stream', '--dbname="{0}"'.format(f.backup_source)])
+ ['--walmethod=stream', f'--dbname="{f.backup_source}"', f'--sleep {5 if long_running else 0}'])
},
'dcs': {
'postgresql': {
Expand All @@ -901,12 +912,13 @@ def clone(self, from_name, cluster_name, to_name):
}
}
}
self.start(to_name, custom_config=custom_config)
self.start(to_name, custom_config=custom_config, max_wait_limit=max_wait_limit)

def backup_restore_config(self, params=None):
def backup_restore_config(self, params=None, long_running=False):
return {
'command': (self.BACKUP_RESTORE_SCRIPT
+ ' --sourcedir=' + os.path.join(self.patroni_path, 'data', 'basebackup')).replace('\\', '/'),
+ ' --sourcedir=' + os.path.join(self.patroni_path, 'data', 'basebackup')
+ f' --sleep {5 if long_running else 0}').replace('\\', '/'),
'test-argument': 'test-value', # test config mapping approach on custom bootstrap/replica creation
**(params or {}),
}
Expand Down Expand Up @@ -1162,3 +1174,5 @@ def before_scenario(context, scenario):
scenario.skip('it is not possible to control state of {0} from tests'.format(context.dcs_ctl.name()))
if 'reject-duplicate-name' in scenario.effective_tags and context.dcs_ctl.name() == 'raft':
scenario.skip('Flaky test with Raft')
if scenario.filename.endswith('bootstrap_labels.feature') and not isinstance(context.dcs_ctl, KubernetesController):
scenario.skip()
29 changes: 29 additions & 0 deletions features/steps/bootstrap_labels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import time

Check failure on line 1 in features/steps/bootstrap_labels.py

View workflow job for this annotation

GitHub Actions / isort

Imports are incorrectly sorted and/or formatted.

from behave import step, then

@step('I start {name:name} in a cluster {cluster_name:w} as a long-running clone of {name2:name}')
def start_cluster_clone(context, name, cluster_name, name2):
context.pctl.clone(name2, cluster_name, name, True)

@step('I start {name:name} in cluster {cluster_name:w} using long-running backup_restore')
def start_patroni(context, name, cluster_name):
return context.pctl.start(name, custom_config={
"scope": cluster_name,
"postgresql": {
'create_replica_methods': ['backup_restore'],
"backup_restore": context.pctl.backup_restore_config(long_running=True),
'authentication': {
'superuser': {'password': 'patroni1'},
'replication': {'password': 'rep-pass1'}
}
}
}, max_wait_limit=-1)

@then('{name:name} is labeled with "{label:w}"')
def pod_labeled(context, name, label):
assert label in context.dcs_ctl.pod_labels(name), f'pod {name} is not labeled with {label}'

@then('{name:name} is not labeled with "{label:w}"')
def pod_not_labeled(context, name, label):
assert label not in context.dcs_ctl.pod_labels(name), f'pod {name} is still labeled with {label}'
4 changes: 2 additions & 2 deletions patroni/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2])
'SERVICE_TAGS', 'NAMESPACE', 'CONTEXT', 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL',
'POD_IP', 'PORTS', 'LABELS', 'BYPASS_API_SERVICE', 'RETRIABLE_HTTP_CODES', 'KEY_PASSWORD',
'USE_SSL', 'SET_ACLS', 'GROUP', 'DATABASE', 'LEADER_LABEL_VALUE', 'FOLLOWER_LABEL_VALUE',
'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA') and name:
'STANDBY_LEADER_LABEL_VALUE', 'TMP_ROLE_LABEL', 'AUTH_DATA', 'BOOTSTRAP_LABELS') and name:
value = os.environ.pop(param)
if name == 'CITUS':
if suffix == 'GROUP':
Expand All @@ -668,7 +668,7 @@ def _get_auth(name: str, params: Collection[str] = _AUTH_ALLOWED_PARAMETERS[:2])
value = value and parse_int(value)
elif suffix in ('HOSTS', 'PORTS', 'CHECKS', 'SERVICE_TAGS', 'RETRIABLE_HTTP_CODES'):
value = value and _parse_list(value)
elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA'):
elif suffix in ('LABELS', 'SET_ACLS', 'AUTH_DATA', 'BOOTSTRAP_LABELS'):
value = _parse_dict(value)
elif suffix in ('USE_PROXIES', 'REGISTER_SERVICE', 'USE_ENDPOINTS', 'BYPASS_API_SERVICE', 'VERIFY'):
value = parse_bool(value)
Expand Down
10 changes: 8 additions & 2 deletions patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._follower_label_value = config.get('follower_label_value', 'replica')
self._standby_leader_label_value = config.get('standby_leader_label_value', 'primary')
self._tmp_role_label = config.get('tmp_role_label')
self._bootstrap_labels: Dict[str, str] = {str(k): str(v) for k, v in (config.get('bootstrap_labels') or EMPTY_DICT).items()}
self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME
super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp)
if self._mpp.is_enabled():
Expand Down Expand Up @@ -1336,8 +1337,13 @@ def touch_member(self, data: Dict[str, Any]) -> bool:
and deep_compare(data, member.data)

if not ret:
metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels,
'annotations': {'status': json.dumps(data, separators=(',', ':'))}}
metadata: Dict[str, Any] = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels,
'annotations': {'status': json.dumps(data, separators=(',', ':'))}}
if self._bootstrap_labels:
if data['state'] in ('initializing new cluster', 'running custom bootstrap script', 'creating replica'):
metadata['labels'].update(self._bootstrap_labels)
else:
metadata['labels'].update({k: None for k, _ in self._bootstrap_labels.items()})
body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata))
ret = self._api.patch_namespaced_pod(self._name, self._namespace, body)
if ret:
Expand Down
1 change: 1 addition & 0 deletions patroni/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,7 @@ def validate_watchdog_mode(value: Any) -> None:
Optional("ports"): [{"name": str, "port": IntValidator(max=65535, expected_type=int, raise_assert=True)}],
Optional("cacert"): str,
Optional("retriable_http_codes"): Or(int, [int]),
Optional("bootstrap_labels"): dict,
},
}),
Optional("citus"): {
Expand Down
23 changes: 18 additions & 5 deletions tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ class BaseTestKubernetes(unittest.TestCase):
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
def setUp(self, config=None):
config = {'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10,
'kubernetes': {'labels': {'f': 'b'}, 'bypass_api_service': True, **(config or {})},
'kubernetes': {'labels': {'f': 'b'}, 'bypass_api_service': True, **(config or {}),
'bootstrap_labels': {'foo': 'bar'}},
'citus': {'group': 0, 'database': 'postgres'}}
self.k = get_dcs(config)
self.assertIsInstance(self.k, Kubernetes)
Expand Down Expand Up @@ -317,31 +318,43 @@ def test_set_config_value(self):
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', create=True)
def test_touch_member(self, mock_patch_namespaced_pod):
mock_patch_namespaced_pod.return_value.metadata.resource_version = '10'
self.k.touch_member({'role': 'replica'})

self.k._name = 'p-1'
self.k.touch_member({'role': 'replica', 'state': 'initializing new cluster'})
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')

self.k.touch_member({'state': 'running', 'role': 'replica'})
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)

self.k.touch_member({'role': 'replica', 'state': 'running custom bootstrap script'})
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')

self.k.touch_member({'state': 'stopped', 'role': 'primary'})
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)

self.k._role_label = 'isMaster'
self.k._leader_label_value = 'true'
self.k._follower_label_value = 'false'
self.k._standby_leader_label_value = 'false'
self.k._tmp_role_label = 'tmp_role'

self.k.touch_member({'state': 'creating replica', 'role': 'replica'})
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')

self.k.touch_member({'state': 'running', 'role': 'replica'})
mock_patch_namespaced_pod.assert_called()
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false')
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'replica')
mock_patch_namespaced_pod.rest_mock()

self.k._name = 'p-0'
self.k.touch_member({'role': 'standby_leader'})
self.k.touch_member({'state': 'running', 'role': 'standby_leader'})
mock_patch_namespaced_pod.assert_called()
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false')
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary')
mock_patch_namespaced_pod.rest_mock()

self.k.touch_member({'role': 'primary'})
self.k.touch_member({'state': 'running', 'role': 'primary'})
mock_patch_namespaced_pod.assert_called()
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'true')
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary')
Expand Down
1 change: 1 addition & 0 deletions tests/test_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"kubernetes": {
"namespace": "string",
"labels": {},
'bootstrap_labels': {'foo': 'bar'},
"scope_label": "string",
"role_label": "string",
"use_endpoints": False,
Expand Down
Loading