diff --git a/docs/dynamic_configuration.rst b/docs/dynamic_configuration.rst index 1990c9404..403cd0aa7 100644 --- a/docs/dynamic_configuration.rst +++ b/docs/dynamic_configuration.rst @@ -56,6 +56,7 @@ In order to change the dynamic configuration you can use either :ref:`patronictl - **archive\_cleanup\_command**: cleanup command for standby leader - **recovery\_min\_apply\_delay**: how long to wait before actually apply WAL records on a standby leader +- **member_slots_ttl**: retention time of physical replication slots for replicas when they are shut down. Default value: `30min`. Set it to `0` if you want to keep the old behavior (when the member key expires from DCS, the slot is immediately removed). The feature works only starting from PostgreSQL 11. - **slots**: define permanent replication slots. These slots will be preserved during switchover/failover. Permanent slots that don't exist will be created by Patroni. With PostgreSQL 11 onwards permanent physical slots are created on all nodes and their position is advanced every **loop_wait** seconds. For PostgreSQL versions older than 11 permanent physical replication slots are maintained only on the current primary. The logical slots are copied from the primary to a standby with restart, and after that their position advanced every **loop_wait** seconds (if necessary). Copying logical slot files performed via ``libpq`` connection and using either rewind or superuser credentials (see **postgresql.authentication** section). There is always a chance that the logical slot position on the replica is a bit behind the former primary, therefore application should be prepared that some messages could be received the second time after the failover. The easiest way of doing so - tracking ``confirmed_flush_lsn``. Enabling permanent replication slots requires **postgresql.use_slots** to be set to ``true``. If there are permanent logical replication slots defined Patroni will automatically enable the ``hot_standby_feedback``. Since the failover of logical replication slots is unsafe on PostgreSQL 9.6 and older and PostgreSQL version 10 is missing some important functions, the feature only works with PostgreSQL 11+. - **my\_slot\_name**: the name of the permanent replication slot. If the permanent slot name matches with the name of the current node it will not be created on this node. If you add a permanent physical replication slot which name matches the name of a Patroni member, Patroni will ensure that the slot that was created is not removed even if the corresponding member becomes unresponsive, situation which would normally result in the slot's removal by Patroni. Although this can be useful in some situations, such as when you want replication slots used by members to persist during temporary failures or when importing existing members to a new Patroni cluster (see :ref:`Convert a Standalone to a Patroni Cluster ` for details), caution should be exercised by the operator that these clashes in names are not persisted in the DCS, when the slot is no longer required, due to its effect on normal functioning of Patroni. @@ -92,7 +93,7 @@ Note: **slots** is a hashmap while **ignore_slots** is an array. For example: type: physical ... -Note: if cluster topology is static (fixed number of nodes that never change their names) you can configure permanent physical replication slots with names corresponding to names of nodes to avoid recycling of WAL files while replica is temporary down: +Note: When running PostgreSQL v11 or newer Patroni maintains physical replication slots on all nodes that could potentially become a leader, so that replica nodes keep WAL segments reserved if they are potentially required by other nodes. In case the node is absent and its member key in DCS gets expired, the corresponding replication slot is dropped after ``member_slots_ttl`` (default value is `30min`). You can increase or decrease retention based on your needs. Alternatively, if your cluster topology is static (fixed number of nodes that never change their names) you can configure permanent physical replication slots with names corresponding to the names of the nodes to avoid slots removal and recycling of WAL files while replica is temporarily down: .. code:: YAML @@ -108,7 +109,7 @@ Note: if cluster topology is static (fixed number of nodes that never change the .. warning:: Permanent replication slots are synchronized only from the ``primary``/``standby_leader`` to replica nodes. That means, applications are supposed to be using them only from the leader node. Using them on replica nodes will cause indefinite growth of ``pg_wal`` on all other nodes in the cluster. - An exception to that rule are permanent physical slots that match the Patroni member names, if you happen to configure any. Those will be synchronized among all nodes as they are used for replication among them. + An exception to that rule are physical slots that match the Patroni member names (created and maintained by Patroni). Those will be synchronized among all nodes as they are used for replication among them. .. warning:: diff --git a/docs/faq.rst b/docs/faq.rst index c56bf0ce3..6345e569c 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -181,7 +181,10 @@ What is the difference between ``etcd`` and ``etcd3`` in Patroni configuration? * API version 2 will be completely removed on Etcd v3.6. I have ``use_slots`` enabled in my Patroni configuration, but when a cluster member goes offline for some time, the replication slot used by that member is dropped on the upstream node. What can I do to avoid that issue? - You can configure a permanent physical replication slot for the members. + There are two options: + + 1. You can tune ``member_slots_ttl`` (default value ``30min``, available since Patroni ``4.0.0`` and PostgreSQL 11 onwards) and replication slots for absent members will not be removed when the members downtime is shorter than the configured threshold. + 2. You can configure permanent physical replication slots for the members. Since Patroni ``3.2.0`` it is now possible to have member slots as permanent slots managed by Patroni. diff --git a/features/dcs_failsafe_mode.feature b/features/dcs_failsafe_mode.feature index c5405ae6e..21feb6554 100644 --- a/features/dcs_failsafe_mode.feature +++ b/features/dcs_failsafe_mode.feature @@ -86,7 +86,7 @@ Feature: dcs failsafe mode @dcs-failsafe @slot-advance Scenario: make sure permanent slots exist on replicas - Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"postgres2":0,"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}} + Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}} Then logical slot dcs_slot_2 is in sync between postgres1 and postgres0 after 20 seconds And logical slot dcs_slot_2 is in sync between postgres1 and postgres2 after 20 seconds When I get all changes from physical slot dcs_slot_1 on postgres1 diff --git a/features/nostream_node.feature b/features/nostream_node.feature index e049a706d..d9d2b87ba 100644 --- a/features/nostream_node.feature +++ b/features/nostream_node.feature @@ -16,3 +16,11 @@ Scenario: check permanent logical replication slots are not copied Then "members/postgres2" key in DCS has replication_state=streaming after 10 seconds And postgres1 does not have a replication slot named test_logical And postgres2 does not have a replication slot named test_logical + +@slot-advance +Scenario: check that slots are written to the /status key + Given "status" key in DCS has postgres0 in slots + And "status" key in DCS has postgres2 in slots + And "status" key in DCS has test_logical in slots + And "status" key in DCS has test_logical in slots + And "status" key in DCS does not have postgres1 in slots diff --git a/features/permanent_slots.feature b/features/permanent_slots.feature index 2928e8298..e52efbdf4 100644 --- a/features/permanent_slots.feature +++ b/features/permanent_slots.feature @@ -3,7 +3,7 @@ Feature: permanent slots Given I start postgres0 Then postgres0 is a leader after 10 seconds And there is a non empty initialize key in DCS after 15 seconds - When I issue a PATCH request to http://127.0.0.1:8008/config with {"slots":{"test_physical":0,"postgres0":0,"postgres1":0,"postgres3":0},"postgresql":{"parameters":{"wal_level":"logical"}}} + When I issue a PATCH request to http://127.0.0.1:8008/config with {"slots":{"test_physical":0,"postgres3":0},"postgresql":{"parameters":{"wal_level":"logical"}}} Then I receive a response code 200 And Response on GET http://127.0.0.1:8008/config contains slots after 10 seconds When I start postgres1 @@ -34,12 +34,14 @@ Feature: permanent slots Scenario: check permanent physical slots that match with member names Given postgres0 has a physical replication slot named postgres3 after 2 seconds And postgres1 has a physical replication slot named postgres0 after 2 seconds + And postgres1 has a physical replication slot named postgres2 after 2 seconds And postgres1 has a physical replication slot named postgres3 after 2 seconds And postgres2 has a physical replication slot named postgres0 after 2 seconds And postgres2 has a physical replication slot named postgres3 after 2 seconds And postgres2 has a physical replication slot named postgres1 after 2 seconds - And postgres1 does not have a replication slot named postgres2 - And postgres3 does not have a replication slot named postgres2 + And postgres3 has a physical replication slot named postgres0 after 2 seconds + And postgres3 has a physical replication slot named postgres1 after 2 seconds + And postgres3 has a physical replication slot named postgres2 after 2 seconds @slot-advance Scenario: check that permanent slots are advanced on replicas @@ -53,19 +55,25 @@ Feature: permanent slots And Logical slot test_logical is in sync between postgres0 and postgres3 after 10 seconds And Physical slot test_physical is in sync between postgres0 and postgres3 after 10 seconds And Physical slot postgres1 is in sync between postgres0 and postgres2 after 10 seconds + And Physical slot postgres1 is in sync between postgres0 and postgres3 after 10 seconds And Physical slot postgres3 is in sync between postgres2 and postgres0 after 20 seconds And Physical slot postgres3 is in sync between postgres2 and postgres1 after 10 seconds - And postgres1 does not have a replication slot named postgres2 - And postgres3 does not have a replication slot named postgres2 @slot-advance - Scenario: check that only permanent slots are written to the /status key + Scenario: check that permanent slots and member slots are written to the /status key Given "status" key in DCS has test_physical in slots And "status" key in DCS has postgres0 in slots And "status" key in DCS has postgres1 in slots - And "status" key in DCS does not have postgres2 in slots + And "status" key in DCS has postgres2 in slots And "status" key in DCS has postgres3 in slots + @slot-advance + Scenario: check that only non-permanent member slots are written to the retain_slots in /status key + And "status" key in DCS has postgres0 in retain_slots + And "status" key in DCS has postgres1 in retain_slots + And "status" key in DCS has postgres2 in retain_slots + And "status" key in DCS does not have postgres3 in retain_slots + Scenario: check permanent physical replication slot after failover Given I shut down postgres3 And I shut down postgres2 diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 3fd71fcec..87b7c7df5 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -751,9 +751,11 @@ class Status(NamedTuple): :ivar last_lsn: :class:`int` object containing position of last known leader LSN. :ivar slots: state of permanent replication slots on the primary in the format: ``{"slot_name": int}``. + :ivar retain_slots: list physical replication slots for members that exist in the cluster. """ last_lsn: int slots: Optional[Dict[str, int]] + retain_slots: List[str] @staticmethod def empty() -> 'Status': @@ -761,20 +763,20 @@ def empty() -> 'Status': :returns: empty :class:`Status` object. """ - return Status(0, None) + return Status(0, None, []) def is_empty(self): """Validate definition of all attributes of this :class:`Status` instance. :returns: ``True`` if all attributes of the current :class:`Status` are unpopulated. """ - return self.last_lsn == 0 and self.slots is None + return self.last_lsn == 0 and self.slots is None and not self.retain_slots @staticmethod def from_node(value: Union[str, Dict[str, Any], None]) -> 'Status': """Factory method to parse *value* as :class:`Status` object. - :param value: JSON serialized string + :param value: JSON serialized string or :class:`dict` object. :returns: constructed :class:`Status` object. """ @@ -785,7 +787,7 @@ def from_node(value: Union[str, Dict[str, Any], None]) -> 'Status': return Status.empty() if isinstance(value, int): # legacy - return Status(value, None) + return Status(value, None, []) if not isinstance(value, dict): return Status.empty() @@ -804,7 +806,16 @@ def from_node(value: Union[str, Dict[str, Any], None]) -> 'Status': if not isinstance(slots, dict): slots = None - return Status(last_lsn, slots) + retain_slots: Union[str, List[str], None] = value.get('retain_slots') + if isinstance(retain_slots, str): + try: + retain_slots = json.loads(retain_slots) + except Exception: + retain_slots = [] + if not isinstance(retain_slots, list): + retain_slots = [] + + return Status(last_lsn, slots, retain_slots) class Cluster(NamedTuple('Cluster', @@ -880,7 +891,8 @@ def __len__(self) -> int: >>> assert bool(cluster) is False - >>> cluster = Cluster(None, None, None, Status(0, None), [1, 2, 3], None, SyncState.empty(), None, None, {}) + >>> status = Status(0, None, []) + >>> cluster = Cluster(None, None, None, status, [1, 2, 3], None, SyncState.empty(), None, None, {}) >>> len(cluster) 1 @@ -984,7 +996,7 @@ def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]: return ret @property - def __permanent_physical_slots(self) -> Dict[str, Any]: + def permanent_physical_slots(self) -> Dict[str, Any]: """Dictionary of permanent ``physical`` replication slots.""" return {name: value for name, value in self.__permanent_slots.items() if self.is_physical_slot(value)} @@ -1011,7 +1023,8 @@ def get_replication_slots(self, postgresql: 'Postgresql', member: Tags, *, name = member.name if isinstance(member, Member) else postgresql.name role = role or postgresql.role - slots: Dict[str, Dict[str, str]] = self._get_members_slots(name, role) + slots: Dict[str, Dict[str, Any]] = self._get_members_slots(name, role, + member.nofailover, postgresql.can_advance_slots) permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role) disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots( @@ -1096,12 +1109,13 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str) return {} if global_config.is_standby_cluster or self.get_slot_name_on_primary(postgresql.name, tags) is None: - return self.__permanent_physical_slots if postgresql.can_advance_slots or role == 'standby_leader' else {} + return self.permanent_physical_slots if postgresql.can_advance_slots or role == 'standby_leader' else {} return self.__permanent_slots if postgresql.can_advance_slots or role in ('master', 'primary') \ else self.__permanent_logical_slots - def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]: + def _get_members_slots(self, name: str, role: str, nofailover: bool, + can_advance_slots: bool) -> Dict[str, Dict[str, Any]]: """Get physical replication slots configuration for members that sourcing from this node. If the ``replicatefrom`` tag is set on the member - we should not create the replication slot for it on @@ -1118,9 +1132,10 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]: * Conflicting slot names between members are found :param name: name of this node. - :param role: role of this node, if this is a ``primary`` or ``standby_leader`` return list of members - replicating from this node. If not then return a list of members replicating as cascaded - replicas from this node. + :param role: role of this node, ``primary``, ``standby_leader``, or ``replica``. + :param nofailover: ``True`` if this node is tagged to not be a failover candidate, ``False`` otherwise. + :param can_advance_slots: ``True`` if ``pg_replication_slot_advance()`` function is available, + ``False`` otherwise. :returns: dictionary of physical replication slots that should exist on a given node. """ @@ -1131,15 +1146,34 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]: # also exlude members with disabled WAL streaming members = filter(lambda m: m.name != name and not m.nostream, self.members) - if role in ('master', 'primary', 'standby_leader'): + if can_advance_slots and global_config.member_slots_ttl > 0: + # if the node does only cascading and can't become the leader, we + # want only to have slots for members that could connect to it. + members = [m for m in members if not nofailover or m.replicatefrom == name] + elif role in ('master', 'primary', 'standby_leader'): # PostgreSQL is older than 11 + # on the leader want to have slots only for the nodes that are supposed to be replicating from it. members = [m for m in members if m.replicatefrom is None or m.replicatefrom == name or not self.has_member(m.replicatefrom)] else: # only manage slots for replicas that replicate from this one, except for the leader among them members = [m for m in members if m.replicatefrom == name and m.name != self.leader_name] - slots = {slot_name_from_member_name(m.name): {'type': 'physical'} for m in members} - if len(slots) < len(members): + slots: Dict[str, int] = self.slots + ret: Dict[str, Dict[str, Any]] = {} + for member in members: + slot_name = slot_name_from_member_name(member.name) + lsn = slots.get(slot_name, 0) + if member.replicatefrom: + # `/status` key is maintained by the leader, but `member` may be connected to some other node. + # In that case, the slot in the leader is inactive and doesn't advance, so we use the LSN + # reported by the member to advance replication slot LSN. + # `max` is only a fallback so we take the LSN from the slot when there is no feedback from the member. + lsn = max(member.lsn or 0, lsn) + ret[slot_name] = {'type': 'physical', 'lsn': lsn} + ret.update({slot: {'type': 'physical'} for slot in self.status.retain_slots + if slot not in ret and slot != name}) + + if len(ret) < len(members): # Find which names are conflicting for a nicer error message slot_conflicts: Dict[str, List[str]] = defaultdict(list) for member in members: @@ -1147,7 +1181,7 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]: logger.error("Following cluster members share a replication slot name: %s", "; ".join(f"{', '.join(v)} map to {k}" for k, v in slot_conflicts.items() if len(v) > 1)) - return slots + return ret def has_permanent_slots(self, postgresql: 'Postgresql', member: Tags) -> bool: """Check if our node has permanent replication slots configured. @@ -1155,26 +1189,31 @@ def has_permanent_slots(self, postgresql: 'Postgresql', member: Tags) -> bool: :param postgresql: reference to :class:`Postgresql` object. :param member: reference to an object implementing :class:`Tags` interface for the node that we are checking permanent logical replication slots for. - :returns: ``True`` if there are permanent replication slots configured, otherwise ``False``. """ role = 'replica' - members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(postgresql.name, role) + members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(postgresql.name, role, + member.nofailover, + postgresql.can_advance_slots) permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role) slots = deepcopy(members_slots) self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.can_advance_slots) return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values()) - def filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]: + def maybe_filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]: """Filter out all non-permanent slots from provided *slots* dict. + .. note:: + In case if retention of replication slots for members is enabled we will not do + any filtering, because we need to publish LSN values for members replication slots, + so that other nodes can use them to advance LSN, like they do it for permanent slots. + :param postgresql: reference to :class:`Postgresql` object. :param slots: slot names with LSN values. - :returns: a :class:`dict` object that contains only slots that are known to be permanent. """ - if not postgresql.can_advance_slots: - return {} # for legacy PostgreSQL we don't support permanent slots on standby nodes + if global_config.member_slots_ttl > 0: + return slots permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, RemoteMember('', {}), 'replica') members_slots = {slot_name_from_member_name(m.name) for m in self.members} @@ -1412,7 +1451,8 @@ def __init__(self, config: Dict[str, Any], mpp: 'AbstractMPP') -> None: self._cluster_thread_lock = Lock() self._last_lsn: int = 0 self._last_seen: int = 0 - self._last_status: Dict[str, Any] = {} + self._last_status: Dict[str, Any] = {'retain_slots': []} + self._last_retain_slots: Dict[str, float] = {} self._last_failsafe: Optional[Dict[str, str]] = {} self.event = Event() @@ -1638,7 +1678,7 @@ def get_cluster(self) -> Cluster: self._cluster_valid_till = time.time() + self.ttl self._last_seen = int(time.time()) - self._last_status = {self._OPTIME: cluster.status.last_lsn} + self._last_status = {self._OPTIME: cluster.status.last_lsn, 'retain_slots': cluster.status.retain_slots} if cluster.status.slots: self._last_status['slots'] = cluster.status.slots self._last_failsafe = cluster.failsafe @@ -1698,6 +1738,14 @@ def write_status(self, value: Dict[str, Any]) -> None: :param value: JSON serializable dictionary with current WAL LSN and ``confirmed_flush_lsn`` of permanent slots. """ + # This method is always called with ``optime`` key, rest of the keys are optional. + # In case if we know old values (stored in self._last_status), we will copy them over. + for name in ('slots', 'retain_slots'): + if name not in value and self._last_status.get(name): + value[name] = self._last_status[name] + # if the key is present, but the value is None, we will not write such pair. + value = {k: v for k, v in value.items() if v is not None} + if not deep_compare(self._last_status, value) and self._write_status(json.dumps(value, separators=(',', ':'))): self._last_status = value cluster = self.cluster @@ -1729,6 +1777,53 @@ def failsafe(self) -> Optional[Dict[str, str]]: """Stored value of :attr:`~AbstractDCS._last_failsafe`.""" return self._last_failsafe + def _build_retain_slots(self, cluster: Cluster, slots: Optional[Dict[str, int]]) -> Optional[List[str]]: + """Handle retention policy of physical replication slots for cluster members. + + When the member key is missing we want to keep its replication slot for a while, so that WAL segments + will not be already absent when it comes back online. It is being solved by storing the list of + replication slots representing members in the ``retain_slots`` field of the ``/status`` key. + + This method handles retention policy by keeping the list of such replication slots in memory + and removing names when they were observed longer than ``member_slots_ttl`` ago. + + :param cluster: :class:`Cluster` object with information about the current cluster state. + :param slots: slot names with LSN values that exist on the leader node and consists + of slots for cluster members and permanent replication slots. + + :returns: the list of replication slots to be written to ``/status`` key or ``None``. + """ + timestamp = time.time() + + # DCS is a source of truth, therefore we take missing values from there + self._last_retain_slots.update({name: timestamp for name in self._last_status['retain_slots'] + if (not slots or name not in slots) and name not in self._last_retain_slots}) + + if slots: # if slots is not empty it implies we are running v11+ + members: Set[str] = set() + found_self = False + for member in cluster.members: + found_self = member.name == self._name + if not member.nostream: + members.add(slot_name_from_member_name(member.name)) + if not found_self: + # It could be that the member key for our node is not in DCS and we can't check tags.nostream. + # In this case our name will falsely appear in `retain_slots`, but only temporary. + members.add(slot_name_from_member_name(self._name)) + + permanent_slots = cluster.permanent_physical_slots + # we want to have in ``retain_slots`` only non-permanent member slots + self._last_retain_slots.update({name: timestamp for name in slots + if name in members and name not in permanent_slots}) + # retention + for name, value in list(self._last_retain_slots.items()): + if value + global_config.member_slots_ttl <= timestamp: + logger.info("Replication slot '%s' for absent cluster member is expired after %d sec.", + name, global_config.member_slots_ttl) + del self._last_retain_slots[name] + + return list(sorted(self._last_retain_slots.keys())) or None + @abc.abstractmethod def _update_leader(self, leader: Leader) -> bool: """Update ``leader`` key (or session) ttl. @@ -1763,9 +1858,8 @@ def update_leader(self, assert isinstance(cluster.leader, Leader) ret = self._update_leader(cluster.leader) if ret and last_lsn: - status: Dict[str, Any] = {self._OPTIME: last_lsn} - if slots: - status['slots'] = slots + status: Dict[str, Any] = {self._OPTIME: last_lsn, 'slots': slots or None, + 'retain_slots': self._build_retain_slots(cluster, slots)} self.write_status(status) if ret and failsafe is not None: diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index d41d9006f..7343b20ab 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1244,6 +1244,8 @@ def update_leader(self, cluster: Cluster, last_lsn: Optional[int], if last_lsn: annotations[self._OPTIME] = str(last_lsn) annotations['slots'] = json.dumps(slots, separators=(',', ':')) if slots else None + retain_slots = self._build_retain_slots(cluster, slots) + annotations['retain_slots'] = json.dumps(retain_slots) if retain_slots else None if failsafe is not None: annotations[self._FAILSAFE] = json.dumps(failsafe, separators=(',', ':')) if failsafe else None diff --git a/patroni/global_config.py b/patroni/global_config.py index 403b29af2..ffa165733 100644 --- a/patroni/global_config.py +++ b/patroni/global_config.py @@ -135,16 +135,18 @@ def is_standby_cluster(self) -> bool: return isinstance(config, dict) and\ bool(config.get('host') or config.get('port') or config.get('restore_command')) - def get_int(self, name: str, default: int = 0) -> int: + def get_int(self, name: str, default: int = 0, base_unit: Optional[str] = None) -> int: """Gets current value of *name* from the global configuration and try to return it as :class:`int`. :param name: name of the parameter. :param default: default value if *name* is not in the configuration or invalid. + :param base_unit: an optional base unit to convert value of *name* parameter to. + Not used if the value does not contain a unit. :returns: currently configured value of *name* from the global configuration or *default* if it is not set or invalid. """ - ret = parse_int(self.get(name)) + ret = parse_int(self.get(name), base_unit) return default if ret is None else ret @property @@ -231,5 +233,13 @@ def permanent_slots(self) -> Dict[str, Any]: or self.get('slots') or EMPTY_DICT.copy()) + @property + def member_slots_ttl(self) -> int: + """Currently configured value of ``member_slots_ttl`` from the global configuration converted to seconds. + + Assume ``1800`` if it is not set or invalid. + """ + return self.get_int('member_slots_ttl', 1800, base_unit='s') + sys.modules[__name__] = GlobalConfig() diff --git a/patroni/ha.py b/patroni/ha.py index f3d465096..ff169df18 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -14,7 +14,7 @@ from .__main__ import Patroni from .async_executor import AsyncExecutor, CriticalTask from .collections import CaseInsensitiveSet -from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, slot_name_from_member_name, Status, SyncState +from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, SyncState from .exceptions import DCSError, PatroniFatalException, PostgresConnectionException from .postgresql.callback_executor import CallbackAction from .postgresql.misc import postgres_version_to_int @@ -173,7 +173,7 @@ def update_cluster(self, cluster: Cluster) -> Cluster: leader = self.leader if leader: # We rely on the strict order of fields in the namedtuple - status = Status(cluster.status.last_lsn, leader.member.data['slots']) + status = Status(cluster.status[0], leader.member.data['slots'], *cluster.status[2:]) cluster = Cluster(*cluster[0:2], leader, status, *cluster[4:]) # To advance LSN of replication slots on the primary for nodes that are doing cascading # replication from other nodes we need to update `xlog_location` on respective members. @@ -383,9 +383,7 @@ def update_lock(self, update_status: bool = False) -> bool: if update_status: try: last_lsn = self._last_wal_lsn = self.state_handler.last_operation() - slots = self.cluster.filter_permanent_slots( - self.state_handler, - {**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn}) + slots = self.cluster.maybe_filter_permanent_slots(self.state_handler, self.state_handler.slots()) except Exception: logger.exception('Exception when called state_handler.last_operation()') try: @@ -1221,12 +1219,10 @@ def check_failsafe_topology(self) -> bool: 'api_url': self.patroni.api.connection_string, } try: - data['slots'] = { - **self.state_handler.slots(), - slot_name_from_member_name(self.state_handler.name): self._last_wal_lsn - } + data['slots'] = self.state_handler.slots() except Exception: logger.exception('Exception when called state_handler.slots()') + members = [RemoteMember(name, {'api_url': url}) for name, url in failsafe.items() if name != self.state_handler.name] if not members: # A sinlge node cluster diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 2876951cb..3b3b4e776 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -17,8 +17,8 @@ from .. import global_config, psycopg from ..async_executor import CriticalTask -from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT -from ..dcs import Cluster, Leader, Member +from ..collections import CaseInsensitiveDict, EMPTY_DICT +from ..dcs import Cluster, Leader, Member, slot_name_from_member_name from ..exceptions import PostgresConnectionException from ..tags import Tags from ..utils import data_directory_is_empty, parse_int, polling_loop, Retry, RetryFailedError @@ -82,10 +82,10 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self.connection_pool = ConnectionPool() self._connection = self.connection_pool.get('heartbeat') self.mpp_handler = mpp.get_handler_impl(self) + self._bin_dir = config.get('bin_dir') or '' self.config = ConfigHandler(self, config) self.config.check_directories() - self._bin_dir = config.get('bin_dir') or '' self.bootstrap = Bootstrap(self) self.bootstrapping = False self.__thread_ident = current_thread().ident @@ -112,15 +112,13 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self._state_entry_timestamp = 0 self._cluster_info_state = {} - self._has_permanent_slots = True + self._should_query_slots = True self._enforce_hot_standby_feedback = False self._cached_replica_timeline = None # Last known running process self._postmaster_proc = None - self._available_gucs = None - if self.is_running(): # If we found postmaster process we need to figure out whether postgres is accepting connections self.set_state('starting') @@ -233,7 +231,7 @@ def cluster_info_query(self) -> str: "plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint" " AS confirmed_flush_lsn, pg_catalog.pg_wal_lsn_diff(restart_lsn, '0/0')::bigint" " AS restart_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)" - if self._has_permanent_slots and self.can_advance_slots else "NULL") + extra + if self._should_query_slots and self.can_advance_slots else "NULL") + extra extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END," " slot_name, conninfo, status, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra) if self.role == 'standby_leader': @@ -245,13 +243,6 @@ def cluster_info_query(self) -> str: return ("SELECT " + self.TL_LSN + ", {3}").format(self.wal_name, self.lsn_name, self.wal_flush, extra) - @property - def available_gucs(self) -> CaseInsensitiveSet: - """GUCs available in this Postgres server.""" - if not self._available_gucs: - self._available_gucs = self._get_gucs() - return self._available_gucs - def _version_file_exists(self) -> bool: return not self.data_directory_empty() and os.path.isfile(self._version_file) @@ -469,7 +460,7 @@ def reset_cluster_info_state(self, cluster: Optional[Cluster], tags: Optional[Ta # to have a logical slot or in case if it is the cascading replica. self.set_enforce_hot_standby_feedback(not global_config.is_standby_cluster and self.can_advance_slots and cluster.should_enforce_hot_standby_feedback(self, tags)) - self._has_permanent_slots = cluster.has_permanent_slots(self, tags) + self._should_query_slots = global_config.member_slots_ttl > 0 or cluster.has_permanent_slots(self, tags) def _cluster_info_state_get(self, name: str) -> Optional[Any]: if not self._cluster_info_state: @@ -480,7 +471,7 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]: 'received_tli', 'slot_name', 'conninfo', 'receiver_state', 'restore_command', 'slots', 'synchronous_commit', 'synchronous_standby_names', 'pg_stat_replication'], result)) - if self._has_permanent_slots and self.can_advance_slots: + if self._should_query_slots and self.can_advance_slots: cluster_info_state['slots'] =\ self.slots_handler.process_permanent_slots(cluster_info_state['slots']) self._cluster_info_state = cluster_info_state @@ -501,7 +492,19 @@ def received_location(self) -> Optional[int]: return self._cluster_info_state_get('received_location') def slots(self) -> Dict[str, int]: - return self._cluster_info_state_get('slots') or {} + """Get replication slots state. + + ..note:: + Since this methods is supposed to be used only by the leader and only to publish state of + replication slots to DCS so that other nodes can advance LSN on respective replication slots, + we are also adding our own name to the list. All slots that shouldn't be published to DCS + later will be filtered out by :meth:`~Cluster.maybe_filter_permanent_slots` method. + + :returns: A :class:`dict` object with replication slot names and LSNs as absolute values. + """ + return {**(self._cluster_info_state_get('slots') or {}), + slot_name_from_member_name(self.name): self.last_operation()} \ + if self.can_advance_slots else {} def primary_slot_name(self) -> Optional[str]: return self._cluster_info_state_get('slot_name') @@ -1363,13 +1366,3 @@ def schedule_sanity_checks_after_pause(self) -> None: self.slots_handler.schedule() self.mpp_handler.schedule_cache_rebuild() self._sysid = '' - - def _get_gucs(self) -> CaseInsensitiveSet: - """Get all available GUCs based on ``postgres --describe-config`` output. - - :returns: all available GUCs in the local Postgres server. - """ - cmd = [self.pgcommand('postgres'), '--describe-config'] - return CaseInsensitiveSet({ - line.split('\t')[0] for line in subprocess.check_output(cmd).decode('utf-8').strip().split('\n') - }) diff --git a/patroni/postgresql/available_parameters/0_postgres.yml b/patroni/postgresql/available_parameters/0_postgres.yml index 741a8e48c..bfc5dfabb 100644 --- a/patroni/postgresql/available_parameters/0_postgres.yml +++ b/patroni/postgresql/available_parameters/0_postgres.yml @@ -4,7 +4,22 @@ parameters: version_from: 170000 allow_in_place_tablespaces: - type: Bool - version_from: 100000 + version_from: 150000 + - type: Bool + version_from: 140005 + version_till: 140099 + - type: Bool + version_from: 130008 + version_till: 130099 + - type: Bool + version_from: 120012 + version_till: 120099 + - type: Bool + version_from: 110017 + version_till: 110099 + - type: Bool + version_from: 100022 + version_till: 100099 allow_system_table_mods: - type: Bool version_from: 90300 @@ -1222,6 +1237,24 @@ parameters: restart_after_crash: - type: Bool version_from: 90300 + restrict_nonsystem_relation_kind: + - type: String + version_from: 170000 + - type: String + version_from: 160004 + version_till: 160099 + - type: String + version_from: 150008 + version_till: 150099 + - type: String + version_from: 140013 + version_till: 140099 + - type: String + version_from: 130016 + version_till: 130099 + - type: String + version_from: 120020 + version_till: 120099 row_security: - type: Bool version_from: 90500 diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index f857bbdfc..bcab56a0a 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -16,8 +16,9 @@ from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name from ..exceptions import PatroniFatalException, PostgresConnectionException from ..file_perm import pg_perm -from ..utils import compare_values, is_subpath, maybe_convert_from_base_unit, \ - parse_bool, parse_int, split_host_port, uri, validate_directory +from ..postgresql.misc import get_major_from_minor_version, postgres_version_to_int +from ..utils import compare_values, get_postgres_version, is_subpath, \ + maybe_convert_from_base_unit, parse_bool, parse_int, split_host_port, uri, validate_directory from ..validator import EnumValidator, IntValidator from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value @@ -403,6 +404,24 @@ def check_directories(self) -> None: def config_dir(self) -> str: return self._config_dir + @property + def pg_version(self) -> int: + """Current full postgres version if instance is running, major version otherwise. + + We can only use ``postgres --version`` output if major version there equals to the one + in data directory. If it is not the case, we should use major version from the ``PG_VERSION`` + file. + """ + if self._postgresql.state == 'running': + try: + return self._postgresql.server_version + except AttributeError: + pass + bin_minor = postgres_version_to_int(get_postgres_version(bin_name=self._postgresql.pgcommand('postgres'))) + bin_major = get_major_from_minor_version(bin_minor) + datadir_major = self._postgresql.major_version + return datadir_major if bin_major != datadir_major else bin_minor + @property def _configuration_to_save(self) -> List[str]: configuration = [os.path.basename(self._postgresql_conf)] @@ -486,9 +505,9 @@ def write_postgresql_conf(self, configuration: Optional[CaseInsensitiveDict] = N with self.config_writer(self._postgresql_conf) as f: include = self._config.get('custom_conf') or self._postgresql_base_conf_name f.writeline("include '{0}'\n".format(ConfigWriter.escape(include))) + version = self.pg_version for name, value in sorted((configuration).items()): - value = transform_postgresql_parameter_value(self._postgresql.major_version, name, value, - self._postgresql.available_gucs) + value = transform_postgresql_parameter_value(version, name, value) if value is not None and\ (name != 'hba_file' or not self._postgresql.bootstrap.running_custom_bootstrap): f.write_param(name, value) @@ -609,8 +628,7 @@ def _write_recovery_params(self, fd: ConfigWriter, recovery_params: CaseInsensit self._passfile_mtime = mtime(self._pgpass) value = self.format_dsn(value) else: - value = transform_recovery_parameter_value(self._postgresql.major_version, name, value, - self._postgresql.available_gucs) + value = transform_recovery_parameter_value(self._postgresql.major_version, name, value) if value is None: continue fd.write_param(name, value) diff --git a/patroni/postgresql/misc.py b/patroni/postgresql/misc.py index a8a2c2960..089e0eb3e 100644 --- a/patroni/postgresql/misc.py +++ b/patroni/postgresql/misc.py @@ -57,6 +57,24 @@ def postgres_major_version_to_int(pg_version: str) -> int: return postgres_version_to_int(pg_version + '.0') +def get_major_from_minor_version(version: int) -> int: + """Extract major PostgreSQL version from the provided full version. + + :param version: integer representation of PostgreSQL full version (major + minor). + + :returns: integer representation of the PostgreSQL major version. + + :Example: + + >>> get_major_from_minor_version(100012) + 100000 + + >>> get_major_from_minor_version(90313) + 90300 + """ + return version // 100 * 100 + + def parse_lsn(lsn: str) -> int: t = lsn.split('/') return int(t[0], 16) * 0x100000000 + int(t[1], 16) diff --git a/patroni/postgresql/validator.py b/patroni/postgresql/validator.py index 66c5ada3e..5effccfd5 100644 --- a/patroni/postgresql/validator.py +++ b/patroni/postgresql/validator.py @@ -6,7 +6,7 @@ import yaml -from ..collections import CaseInsensitiveDict, CaseInsensitiveSet +from ..collections import CaseInsensitiveDict from ..exceptions import PatroniException from ..utils import parse_bool, parse_int, parse_real from .available_parameters import get_validator_files, PathLikeObj @@ -412,9 +412,8 @@ class in this module. def _transform_parameter_value(validators: MutableMapping[str, Tuple[_Transformable, ...]], - version: int, name: str, value: Any, - available_gucs: CaseInsensitiveSet) -> Optional[Any]: - """Validate *value* of GUC *name* for Postgres *version* using defined *validators* and *available_gucs*. + version: int, name: str, value: Any) -> Optional[Any]: + """Validate *value* of GUC *name* for Postgres *version* using defined *validators*. :param validators: a dictionary of all GUCs across all Postgres versions. Each key is the name of a Postgres GUC, and the corresponding value is a variable length tuple of :class:`_Transformable`. Each item is a validation @@ -423,8 +422,6 @@ def _transform_parameter_value(validators: MutableMapping[str, Tuple[_Transforma :param version: Postgres version to validate the GUC against. :param name: name of the Postgres GUC. :param value: value of the Postgres GUC. - :param available_gucs: a set of all GUCs available in Postgres *version*. Each item is the name of a Postgres - GUC. Used for a couple purposes: * Disallow writing GUCs to ``postgresql.conf`` (or ``recovery.conf``) that does not exist in Postgres *version*; * Avoid ignoring GUC *name* if it does not have a validator in *validators*, but is a valid GUC in Postgres @@ -432,35 +429,23 @@ def _transform_parameter_value(validators: MutableMapping[str, Tuple[_Transforma :returns: the return value may be one among: - * *value* transformed to the expected format for GUC *name* in Postgres *version*, if *name* is present - in *available_gucs* and has a validator in *validators* for the corresponding Postgres *version*; or - * The own *value* if *name* is present in *available_gucs* but not in *validators*; or - * ``None`` if *name* is not present in *available_gucs*. + * *value* transformed to the expected format for GUC *name* in Postgres *version*, if *name* has a validator + in *validators* for the corresponding Postgres *version*; or + * ``None`` if *name* does not have a validator in *validators*. """ - if name in available_gucs: - for validator in validators.get(name, ()) or (): - if version >= validator.version_from and\ - (validator.version_till is None or version < validator.version_till): - return validator.transform(name, value) - # Ideally we should have a validator in *validators*. However, if none is available, we will not discard a - # setting that exists in Postgres *version*, but rather allow the value with no validation. - return value + for validator in validators.get(name, ()) or (): + if version >= validator.version_from and\ + (validator.version_till is None or version < validator.version_till): + return validator.transform(name, value) logger.warning('Removing unexpected parameter=%s value=%s from the config', name, value) -def transform_postgresql_parameter_value(version: int, name: str, value: Any, - available_gucs: CaseInsensitiveSet) -> Optional[Any]: - """Validate *value* of GUC *name* for Postgres *version* using ``parameters`` and *available_gucs*. +def transform_postgresql_parameter_value(version: int, name: str, value: Any) -> Optional[Any]: + """Validate *value* of GUC *name* for Postgres *version* using ``parameters``. :param version: Postgres version to validate the GUC against. :param name: name of the Postgres GUC. :param value: value of the Postgres GUC. - :param available_gucs: a set of all GUCs available in Postgres *version*. Each item is the name of a Postgres - GUC. Used for a couple purposes: - - * Disallow writing GUCs to ``postgresql.conf`` that does not exist in Postgres *version*; - * Avoid ignoring GUC *name* if it does not have a validator in ``parameters``, but is a valid GUC in - Postgres *version*. :returns: The return value may be one among: @@ -475,32 +460,17 @@ def transform_postgresql_parameter_value(version: int, name: str, value: Any, return value if name in recovery_parameters: return None - return _transform_parameter_value(parameters, version, name, value, available_gucs) + return _transform_parameter_value(parameters, version, name, value) -def transform_recovery_parameter_value(version: int, name: str, value: Any, - available_gucs: CaseInsensitiveSet) -> Optional[Any]: - """Validate *value* of GUC *name* for Postgres *version* using ``recovery_parameters`` and *available_gucs*. +def transform_recovery_parameter_value(version: int, name: str, value: Any) -> Optional[Any]: + """Validate *value* of GUC *name* for Postgres *version* using ``recovery_parameters``. :param version: Postgres version to validate the recovery GUC against. :param name: name of the Postgres recovery GUC. :param value: value of the Postgres recovery GUC. - :param available_gucs: a set of all GUCs available in Postgres *version*. Each item is the name of a Postgres - GUC. Used for a couple purposes: - - * Disallow writing GUCs to ``recovery.conf`` (or ``postgresql.conf`` depending on *version*), that does not - exist in Postgres *version*; - * Avoid ignoring recovery GUC *name* if it does not have a validator in ``recovery_parameters``, but is a - valid GUC in Postgres *version*. :returns: *value* transformed to the expected format for recovery GUC *name* in Postgres *version* using validators defined in ``recovery_parameters``. It can also return ``None``. See :func:`_transform_parameter_value`. """ - # Recovery settings are not present in ``postgres --describe-config`` output of Postgres <= 11. In that case we - # just pass down the list of settings defined in Patroni validators so :func:`_transform_parameter_value` will not - # discard the recovery GUCs when running Postgres <= 11. - # NOTE: At the moment this change was done Postgres 11 was almost EOL, and had been likely extensively used with - # Patroni, so we should be able to rely solely on Patroni validators as the source of truth. - return _transform_parameter_value( - recovery_parameters, version, name, value, - available_gucs if version >= 120000 else CaseInsensitiveSet(recovery_parameters.keys())) + return _transform_parameter_value(recovery_parameters, version, name, value) diff --git a/patroni/utils.py b/patroni/utils.py index 021fa416a..1ef086265 100644 --- a/patroni/utils.py +++ b/patroni/utils.py @@ -1180,8 +1180,8 @@ def unquote(string: str) -> str: return ret -def get_major_version(bin_dir: Optional[str] = None, bin_name: str = 'postgres') -> str: - """Get the major version of PostgreSQL. +def get_postgres_version(bin_dir: Optional[str] = None, bin_name: str = 'postgres') -> str: + """Get full PostgreSQL version. It is based on the output of ``postgres --version``. @@ -1189,15 +1189,15 @@ def get_major_version(bin_dir: Optional[str] = None, bin_name: str = 'postgres') *bin_name* binary that is found by the subprocess in the ``PATH``. :param bin_name: name of the postgres binary to call (``postgres`` by default) - :returns: the PostgreSQL major version. + :returns: the PostgreSQL version. :raises: :exc:`~patroni.exceptions.PatroniException`: if the postgres binary call failed due to :exc:`OSError`. :Example: - * Returns `9.6` for PostgreSQL 9.6.24 - * Returns `15` for PostgreSQL 15.2 + * Returns `9.6.24` for PostgreSQL 9.6.24 + * Returns `15.2` for PostgreSQL 15.2 """ if not bin_dir: binary = bin_name @@ -1207,7 +1207,35 @@ def get_major_version(bin_dir: Optional[str] = None, bin_name: str = 'postgres') version = subprocess.check_output([binary, '--version']).decode() except OSError as e: raise PatroniException(f'Failed to get postgres version: {e}') - version = re.match(r'^[^\s]+ [^\s]+ (\d+)(\.(\d+))?', version) + version = re.match(r'^[^\s]+ [^\s]+ ((\d+)(\.\d+)*)', version) if TYPE_CHECKING: # pragma: no cover assert version is not None - return '.'.join([version.group(1), version.group(3)]) if int(version.group(1)) < 10 else version.group(1) + version = version.groups() # e.g., ('15.2', '15', '.2') + major_version = int(version[1]) + dot_count = version[0].count('.') + if major_version < 10 and dot_count < 2 or major_version >= 10 and dot_count < 1: + return '.'.join((version[0], '0')) + return version[0] + + +def get_major_version(bin_dir: Optional[str] = None, bin_name: str = 'postgres') -> str: + """Get the major version of PostgreSQL. + + Like func:`get_postgres_version` but without minor version. + + :param bin_dir: path to the PostgreSQL binaries directory. If ``None`` or an empty string, it will use the first + *bin_name* binary that is found by the subprocess in the ``PATH``. + :param bin_name: name of the postgres binary to call (``postgres`` by default) + + :returns: the PostgreSQL major version. + + :raises: + :exc:`~patroni.exceptions.PatroniException`: if the postgres binary call failed due to :exc:`OSError`. + + :Example: + + * Returns `9.6` for PostgreSQL 9.6.24 + * Returns `15` for PostgreSQL 15.2 + """ + full_version = get_postgres_version(bin_dir, bin_name) + return re.sub(r'\.\d+$', '', full_version) diff --git a/patroni/validator.py b/patroni/validator.py index c1438d38c..5df8b1cd4 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1028,6 +1028,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("retry_timeout"): IntValidator(min=3, raise_assert=True), Optional("maximum_lag_on_failover"): IntValidator(min=0, raise_assert=True), Optional("maximum_lag_on_syncnode"): IntValidator(min=-1, raise_assert=True), + Optional('member_slots_ttl'): IntValidator(min=0, base_unit='s', raise_assert=True), Optional("postgresql"): { Optional("parameters"): { Optional("max_connections"): IntValidator(1, 262143, raise_assert=True), diff --git a/tests/__init__.py b/tests/__init__.py index 66659deee..c3bfc8d1c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -3,7 +3,7 @@ import shutil import unittest -from unittest.mock import Mock, patch, PropertyMock +from unittest.mock import Mock, patch import urllib3 @@ -20,15 +20,6 @@ class SleepException(Exception): pass -mock_available_gucs = PropertyMock(return_value={ - 'cluster_name', 'constraint_exclusion', 'force_parallel_mode', 'hot_standby', 'listen_addresses', 'max_connections', - 'max_locks_per_transaction', 'max_prepared_transactions', 'max_replication_slots', 'max_stack_depth', - 'max_wal_senders', 'max_worker_processes', 'port', 'search_path', 'shared_preload_libraries', - 'stats_temp_directory', 'synchronous_standby_names', 'track_commit_timestamp', 'unix_socket_directories', - 'vacuum_cost_delay', 'vacuum_cost_limit', 'wal_keep_size', 'wal_level', 'wal_log_hints', 'zero_damaged_pages', - 'autovacuum', 'wal_segment_size', 'wal_block_size', 'shared_buffers', 'wal_buffers', -}) - GET_PG_SETTINGS_RESULT = [ ('wal_segment_size', '2048', '8kB', 'integer', 'internal'), ('wal_block_size', '8192', None, 'integer', 'internal'), diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 6371f6b1c..792924d9b 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -10,13 +10,13 @@ from patroni.postgresql.cancellable import CancellableSubprocess from patroni.postgresql.config import ConfigHandler, get_param_diff -from . import BaseTestPostgresql, mock_available_gucs, psycopg_connect +from . import BaseTestPostgresql, psycopg_connect @patch('subprocess.call', Mock(return_value=0)) +@patch('subprocess.check_output', Mock(return_value=b"postgres (PostgreSQL) 12.1")) @patch('patroni.psycopg.connect', psycopg_connect) @patch('os.rename', Mock()) -@patch.object(Postgresql, 'available_gucs', mock_available_gucs) class TestBootstrap(BaseTestPostgresql): @patch('patroni.postgresql.CallbackExecutor', Mock()) diff --git a/tests/test_consul.py b/tests/test_consul.py index d7288c52f..67750cfc9 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -46,8 +46,8 @@ def kv_get(self, key, **kwargs): 'ModifyIndex': 6429, 'Value': b'{"leader": "leader", "sync_standby": null}'}, {'CreateIndex': 1085, 'Flags': 0, 'Key': key + 'failsafe', 'LockIndex': 0, 'ModifyIndex': 6429, 'Value': b'{'}, - {'CreateIndex': 1085, 'Flags': 0, 'Key': key + 'status', 'LockIndex': 0, - 'ModifyIndex': 6429, 'Value': b'{"optime":4496294792, "slots":{"ls":12345}}'}]) + {'CreateIndex': 1085, 'Flags': 0, 'Key': key + 'status', 'LockIndex': 0, 'ModifyIndex': 6429, + 'Value': b'{"optime":4496294792,"slots":{"ls":12345},"retain_slots":["postgresql0","postgresql1"]}'}]) if key == 'service/good/': return good_cls if key == 'service/broken/': diff --git a/tests/test_etcd.py b/tests/test_etcd.py index 1fe48a8ec..eb88b6867 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -77,7 +77,8 @@ def etcd_read(self, key, **kwargs): "modifiedIndex": 20730, "createdIndex": 20730}], "modifiedIndex": 1581, "createdIndex": 1581}, {"key": "/service/batman5/failsafe", "value": '{', "modifiedIndex": 1582, "createdIndex": 1582}, - {"key": "/service/batman5/status", "value": '{"optime":2164261704,"slots":{"ls":12345}}', + {"key": "/service/batman5/status", + "value": '{"optime":2164261704,"slots":{"ls":12345},"retain_slots":["postgresql0","postgresql1"]}', "modifiedIndex": 1582, "createdIndex": 1582}], "modifiedIndex": 1581, "createdIndex": 1581}} if key == '/service/legacy/': response['node']['nodes'].pop() diff --git a/tests/test_etcd3.py b/tests/test_etcd3.py index 54941130d..ee4a446f6 100644 --- a/tests/test_etcd3.py +++ b/tests/test_etcd3.py @@ -225,7 +225,8 @@ def test_get_cluster(self): "header": {"revision": "1"}, "kvs": [ {"key": base64_encode('/patroni/test/status'), - "value": base64_encode('{"optime":1234567,"slots":{"ls":12345}}'), "mod_revision": '1'} + "value": base64_encode('{"optime":1234567,"slots":{"ls":12345},"retain_slots": ["foo"]}'), + "mod_revision": '1'} ] }) self.assertIsInstance(self.etcd3.get_cluster(), Cluster) diff --git a/tests/test_ha.py b/tests/test_ha.py index 0d218c386..87316984e 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -42,8 +42,8 @@ def get_cluster(initialize, leader, members, failover, sync, cluster_config=None t = datetime.datetime.now().isoformat() history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]', [(1, 67197376, 'no recovery target specified', t, 'foo')]) - cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True}, 1) - return Cluster(initialize, cluster_config, leader, Status(10, None), members, failover, sync, history, failsafe) + cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True, 'member_slots_ttl': 0}, 1) + return Cluster(initialize, cluster_config, leader, Status(10, None, []), members, failover, sync, history, failsafe) def get_cluster_not_initialized_without_leader(cluster_config=None): diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 432158de9..41d80f812 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -26,14 +26,16 @@ def mock_list_namespaced_config_map(*args, **kwargs): 'annotations': {'initialize': '123', 'config': '{}'}} items = [k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))] metadata.update({'name': 'test-leader', - 'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', 'slots': '{', 'failsafe': '{'}}) + 'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', + 'slots': '{', 'retain_slots': '{', 'failsafe': '{'}}) items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))) metadata.update({'name': 'test-failover', 'annotations': {'leader': 'p-0'}}) items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))) metadata.update({'name': 'test-sync', 'annotations': {'leader': 'p-0'}}) items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))) metadata.update({'name': 'test-0-leader', 'labels': {k8s_group_label: '0'}, - 'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', 'slots': '{', 'failsafe': '{'}}) + 'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', + 'slots': '{', 'retain_slots': '{', 'failsafe': '{'}}) items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))) metadata.update({'name': 'test-0-config', 'labels': {k8s_group_label: '0'}, 'annotations': {'initialize': '123', 'config': '{}'}}) @@ -421,7 +423,7 @@ def test__update_leader_with_retry(self, mock_patch, mock_read): mock_patch.side_effect = RetryFailedError('') self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123') mock_patch.side_effect = k8s_client.rest.ApiException(409, '') - with patch('time.time', Mock(side_effect=[0, 100, 200, 0, 0, 0, 0, 100, 200])): + with patch('time.time', Mock(side_effect=[0, 0, 100, 200, 0, 0, 0, 0, 0, 100, 200])): self.assertFalse(self.k.update_leader(cluster, '123')) self.assertFalse(self.k.update_leader(cluster, '123')) self.assertFalse(self.k.update_leader(cluster, '123')) diff --git a/tests/test_patroni.py b/tests/test_patroni.py index e65b1870a..abf86f45f 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -78,7 +78,6 @@ def test_validate_config(self): @patch.object(etcd.Client, 'read', etcd_read) @patch.object(Thread, 'start', Mock()) @patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379'])) - @patch.object(Postgresql, '_get_gucs', Mock(return_value={'foo': True, 'bar': True})) def setUp(self): self._handlers = logging.getLogger().handlers[:] RestApiServer._BaseServer__is_shut_down = Mock() @@ -102,7 +101,6 @@ def test_load_dynamic_configuration(self): @patch.object(etcd.Client, 'delete', Mock()) @patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379'])) @patch.object(Thread, 'join', Mock()) - @patch.object(Postgresql, '_get_gucs', Mock(return_value={'foo': True, 'bar': True})) def test_patroni_patroni_main(self): with patch('subprocess.call', Mock(return_value=1)): with patch.object(Patroni, 'run', Mock(side_effect=SleepException)): diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index c249bdbc7..76b51aeb4 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -15,7 +15,7 @@ from patroni import global_config from patroni.async_executor import CriticalTask -from patroni.collections import CaseInsensitiveDict, CaseInsensitiveSet +from patroni.collections import CaseInsensitiveDict from patroni.dcs import RemoteMember from patroni.exceptions import PatroniException, PostgresConnectionException from patroni.postgresql import Postgresql, STATE_NO_RESPONSE, STATE_REJECT @@ -24,12 +24,12 @@ from patroni.postgresql.config import _false_validator, get_param_diff from patroni.postgresql.postmaster import PostmasterProcess from patroni.postgresql.validator import _get_postgres_guc_validators, _load_postgres_gucs_validators, \ - _read_postgres_gucs_validators_file, Bool, Enum, EnumBool, Integer, InvalidGucValidatorsFile, Real, String, \ - ValidatorFactory, ValidatorFactoryInvalidSpec, ValidatorFactoryInvalidType, ValidatorFactoryNoType + _read_postgres_gucs_validators_file, Bool, Enum, EnumBool, Integer, InvalidGucValidatorsFile, \ + Real, String, transform_postgresql_parameter_value, ValidatorFactory, ValidatorFactoryInvalidSpec, \ + ValidatorFactoryInvalidType, ValidatorFactoryNoType from patroni.utils import RetryFailedError -from . import BaseTestPostgresql, GET_PG_SETTINGS_RESULT, \ - mock_available_gucs, MockCursor, MockPostmaster, psycopg_connect +from . import BaseTestPostgresql, GET_PG_SETTINGS_RESULT, MockCursor, MockPostmaster, psycopg_connect mtime_ret = {} @@ -98,8 +98,8 @@ def pg_controldata_string(*args, **kwargs): @patch('subprocess.call', Mock(return_value=0)) +@patch('subprocess.check_output', Mock(return_value=b"postgres (PostgreSQL) 12.1")) @patch('patroni.psycopg.connect', psycopg_connect) -@patch.object(Postgresql, 'available_gucs', mock_available_gucs) class TestPostgresql(BaseTestPostgresql): @patch('subprocess.call', Mock(return_value=0)) @@ -107,7 +107,6 @@ class TestPostgresql(BaseTestPostgresql): @patch('patroni.postgresql.CallbackExecutor', Mock()) @patch.object(Postgresql, 'get_major_version', Mock(return_value=140000)) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) - @patch.object(Postgresql, 'available_gucs', mock_available_gucs) def setUp(self): super(TestPostgresql, self).setUp() self.p.config.write_postgresql_conf() @@ -530,9 +529,16 @@ def test_controldata(self): self.assertEqual(self.p.controldata(), {}) @patch('patroni.postgresql.Postgresql._version_file_exists', Mock(return_value=True)) - @patch('subprocess.check_output', MagicMock(return_value=0, side_effect=pg_controldata_string)) def test_sysid(self): - self.assertEqual(self.p.sysid, "6200971513092291716") + with patch('subprocess.check_output', Mock(return_value=0, side_effect=pg_controldata_string)): + self.assertEqual(self.p.sysid, "6200971513092291716") + + def test_pg_version(self): + self.assertEqual(self.p.config.pg_version, 99999) # server_version + with patch.object(Postgresql, 'server_version', PropertyMock(side_effect=AttributeError)): + self.assertEqual(self.p.config.pg_version, 140000) # PG_VERSION==14, postgres --version == 12.1 + with patch('subprocess.check_output', Mock(return_value=b"postgres (PostgreSQL) 14.1")): + self.assertEqual(self.p.config.pg_version, 140001) @patch('os.path.isfile', Mock(return_value=True)) @patch('shutil.copy', Mock(side_effect=IOError)) @@ -896,6 +902,32 @@ def test_set_enforce_hot_standby_feedback(self): def test_handle_parameter_change(self): self.p.handle_parameter_change() + @patch('patroni.postgresql.validator.logger.warning') + def test_transform_postgresql_parameter_value(self, mock_warning): + not_none_values = ( + ('foo.bar', 'foo', 160003), # name, value, version + ("allow_in_place_tablespaces", 'true', 130008), + ("restrict_nonsystem_relation_kind", 'view', 160005) + ) + for i in not_none_values: + self.assertIsNotNone( + transform_postgresql_parameter_value(i[2], i[0], i[1]) + ) + + none_values = ( + ("archive_cleanup_command", 'foo', 160003, False), # name, value, version, unexpected param + ("allow_in_place_tablespaces", 'true', 130005, True), + ("restrict_nonsystem_relation_kind", 'view', 160001, True), + ) + for i in none_values: + self.assertIsNone( + transform_postgresql_parameter_value(i[2], i[0], i[1]) + ) + if i[3]: + mock_warning.assert_called_once_with( + 'Removing unexpected parameter=%s value=%s from the config', i[0], i[1]) + mock_warning.reset_mock() + def test_validator_factory(self): # validator with no type validator = { @@ -1115,12 +1147,6 @@ class TestPostgresql2(BaseTestPostgresql): def setUp(self): super(TestPostgresql2, self).setUp() - @patch('subprocess.check_output', Mock(return_value='\n'.join(mock_available_gucs.return_value).encode('utf-8'))) - def test_available_gucs(self): - gucs = self.p.available_gucs - self.assertIsInstance(gucs, CaseInsensitiveSet) - self.assertEqual(gucs, mock_available_gucs.return_value) - def test_cluster_info_query(self): self.assertIn('diff(pg_catalog.pg_current_wal_flush_lsn(', self.p.cluster_info_query) self.p._major_version = 90600 diff --git a/tests/test_raft.py b/tests/test_raft.py index 121019001..8660ff40e 100644 --- a/tests/test_raft.py +++ b/tests/test_raft.py @@ -152,7 +152,8 @@ def test_raft(self): self.assertIsInstance(cluster, Cluster) self.assertIsInstance(cluster.workers[1], Cluster) self.assertTrue(raft.delete_leader(cluster.leader)) - self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}')) + self.assertTrue(raft._sync_obj.set(raft.status_path, + '{"optime":1234567,"slots":{"ls":12345},"retain_slots":["postgresql0"]}')) raft.get_cluster() self.assertTrue(raft.update_leader(cluster, '1', failsafe={'foo': 'bat'})) self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}')) diff --git a/tests/test_slots.py b/tests/test_slots.py index 3739dc5c0..2ff2c4d50 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -38,7 +38,7 @@ def setUp(self): self.s = self.p.slots_handler self.p.start() config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1) - self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}), + self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}, []), [self.me, self.other, self.leadermem], None, SyncState.empty(), None, None) global_config.update(self.cluster) self.tags = TestTags() @@ -47,7 +47,7 @@ def test_sync_replication_slots(self): config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'}, 'A': 0, 'ls': 0, 'b': {'type': 'logical', 'plugin': '1'}}, 'ignore_slots': [{'name': 'blabla'}]}, 1) - cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}), + cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}, []), [self.me, self.other, self.leadermem], None, SyncState.empty(), None, None) global_config.update(cluster) with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)): @@ -89,7 +89,7 @@ def test_cascading_replica_sync_replication_slots(self): 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'postgresql0'} }) - cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}), + cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}, []), [self.me, self.other, self.leadermem, cascading_replica], None, SyncState.empty(), None, None) self.p.set_role('replica') with patch.object(Postgresql, '_query') as mock_query, \ @@ -114,30 +114,33 @@ def test_process_permanent_slots(self): "confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344}, {"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None, "confirmed_flush_lsn": None, "catalog_xmin": 105, "restart_lsn": 12344}])] - self.assertEqual(self.p.slots(), {'ls': 12345, 'blabla': 12344}) + self.assertEqual(self.p.slots(), {'ls': 12345, 'blabla': 12344, 'postgresql0': 0}) self.p.reset_cluster_info_state(None) mock_query.return_value = [( 1, 0, 0, 0, 0, 0, 0, 0, 0, None, None, [{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b", "confirmed_flush_lsn": 12345, "catalog_xmin": 105}])] - self.assertEqual(self.p.slots(), {}) + self.assertEqual(self.p.slots(), {'postgresql0': 0}) def test_nostream_slot_processing(self): config = ClusterConfig( 1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1) nostream_node = Member(0, 'test-2', 28, { 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', - 'tags': {'nostream': 'True'} + 'tags': {'nostream': 'True'}, + 'xlog_location': 10, }) cascade_node = Member(0, 'test-3', 28, { 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', - 'tags': {'replicatefrom': 'test-2'} + 'tags': {'replicatefrom': 'test-2'}, + 'xlog_location': 98 }) stream_node = Member(0, 'test-4', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'}) + 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'xlog_location': 99}) cluster = Cluster( - True, config, self.leader, Status.empty(), + True, config, self.leader, Status(100, {'leader': 99, 'test_2': 98, 'test_3': 97, 'test_4': 98}, []), [self.leadermem, nostream_node, cascade_node, stream_node], None, SyncState.empty(), None, None) global_config.update(cluster) @@ -147,8 +150,8 @@ def test_nostream_slot_processing(self): cluster._get_permanent_slots(self.p, self.leadermem, 'primary'), {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}) self.assertEqual( - cluster._get_members_slots(self.p.name, 'primary'), - {'test_4': {'type': 'physical'}}) + cluster._get_members_slots(self.p.name, 'primary', False, True), + {'test_3': {'type': 'physical', 'lsn': 98}, 'test_4': {'type': 'physical', 'lsn': 98}}) # nostream node must not have slot on primary self.p.name = nostream_node.name @@ -162,8 +165,10 @@ def test_nostream_slot_processing(self): # check cascade member-slot existence on nostream node self.assertEqual( - cluster._get_members_slots(nostream_node.name, 'replica'), - {'test_3': {'type': 'physical'}}) + cluster._get_members_slots(nostream_node.name, 'replica', False, True), + {'leader': {'type': 'physical', 'lsn': 99}, + 'test_3': {'type': 'physical', 'lsn': 98}, + 'test_4': {'type': 'physical', 'lsn': 98}}) # cascade also does not entitled to have logical slot on itself ... self.p.name = cascade_node.name @@ -291,7 +296,7 @@ def test_slots_advance_thread(self): @patch.object(Postgresql, 'is_primary', Mock(return_value=False)) def test_advance_physical_slots(self): config = ClusterConfig(1, {'slots': {'blabla': {'type': 'physical'}, 'leader': None}}, 1) - cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}), + cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}, []), [self.me, self.other, self.leadermem], None, SyncState.empty(), None, None) global_config.update(cluster) self.s.sync_replication_slots(cluster, self.tags) diff --git a/tests/test_sync.py b/tests/test_sync.py index 3dfa99de1..c7093c932 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -7,12 +7,11 @@ from patroni.dcs import Cluster, ClusterConfig, Status, SyncState from patroni.postgresql import Postgresql -from . import BaseTestPostgresql, mock_available_gucs, psycopg_connect +from . import BaseTestPostgresql, psycopg_connect @patch('subprocess.call', Mock(return_value=0)) @patch('patroni.psycopg.connect', psycopg_connect) -@patch.object(Postgresql, 'available_gucs', mock_available_gucs) class TestSync(BaseTestPostgresql): @patch('subprocess.call', Mock(return_value=0)) @@ -20,7 +19,6 @@ class TestSync(BaseTestPostgresql): @patch('patroni.postgresql.CallbackExecutor', Mock()) @patch.object(Postgresql, 'get_major_version', Mock(return_value=140000)) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) - @patch.object(Postgresql, 'available_gucs', mock_available_gucs) def setUp(self): super(TestSync, self).setUp() self.p.config.write_postgresql_conf() diff --git a/tests/test_utils.py b/tests/test_utils.py index dc7d3cce5..126516c61 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,7 +3,8 @@ from unittest.mock import Mock, patch from patroni.exceptions import PatroniException -from patroni.utils import enable_keepalive, polling_loop, Retry, RetryFailedError, unquote, validate_directory +from patroni.utils import enable_keepalive, get_major_version, get_postgres_version, \ + polling_loop, Retry, RetryFailedError, unquote, validate_directory class TestUtils(unittest.TestCase): @@ -67,6 +68,47 @@ def test_unquote(self): '\'value with a \'"\'"\' single quote\''), 'value with a \' single quote') + def test_get_postgres_version(self): + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 9.6.24\n')): + self.assertEqual(get_postgres_version(), '9.6.24') + with patch('subprocess.check_output', + Mock(return_value=b'postgres (PostgreSQL) 10.23 (Ubuntu 10.23-4.pgdg22.04+1)\n')): + self.assertEqual(get_postgres_version(), '10.23') + with patch('subprocess.check_output', + Mock(return_value=b'postgres (PostgreSQL) 17beta3 (Ubuntu 17~beta3-1.pgdg22.04+1)\n')): + self.assertEqual(get_postgres_version(), '17.0') + with patch('subprocess.check_output', + Mock(return_value=b'postgres (PostgreSQL) 9.6beta3\n')): + self.assertEqual(get_postgres_version(), '9.6.0') + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 9.6rc2\n')): + self.assertEqual(get_postgres_version(), '9.6.0') + # because why not + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 10\n')): + self.assertEqual(get_postgres_version(), '10.0') + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 10wow, something new\n')): + self.assertEqual(get_postgres_version(), '10.0') + with patch('subprocess.check_output', Mock(side_effect=OSError)): + self.assertRaises(PatroniException, get_postgres_version, 'postgres') + + def test_get_major_version(self): + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 9.6.24\n')): + self.assertEqual(get_major_version(), '9.6') + with patch('subprocess.check_output', + Mock(return_value=b'postgres (PostgreSQL) 10.23 (Ubuntu 10.23-4.pgdg22.04+1)\n')): + self.assertEqual(get_major_version(), '10') + with patch('subprocess.check_output', + Mock(return_value=b'postgres (PostgreSQL) 17beta3 (Ubuntu 17~beta3-1.pgdg22.04+1)\n')): + self.assertEqual(get_major_version(), '17') + with patch('subprocess.check_output', + Mock(return_value=b'postgres (PostgreSQL) 9.6beta3\n')): + self.assertEqual(get_major_version(), '9.6') + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 9.6rc2\n')): + self.assertEqual(get_major_version(), '9.6') + with patch('subprocess.check_output', Mock(return_value=b'postgres (PostgreSQL) 10\n')): + self.assertEqual(get_major_version(), '10') + with patch('subprocess.check_output', Mock(side_effect=OSError)): + self.assertRaises(PatroniException, get_major_version, 'postgres') + @patch('time.sleep', Mock()) class TestRetrySleeper(unittest.TestCase): diff --git a/tests/test_zookeeper.py b/tests/test_zookeeper.py index 7a4c3538e..f774b3ed3 100644 --- a/tests/test_zookeeper.py +++ b/tests/test_zookeeper.py @@ -54,7 +54,8 @@ def get(self, path, watch=None): elif path.endswith('/initialize'): return (b'foo', ZnodeStat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)) elif path.endswith('/status'): - return (b'{"optime":500,"slots":{"ls":1234567}}', ZnodeStat(0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0)) + return (b'{"optime":500,"slots":{"ls":1234567},"retain_slots":["postgresql0"]}', + ZnodeStat(0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0)) elif path.endswith('/failsafe'): return (b'{a}', ZnodeStat(0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0)) return (b'', ZnodeStat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))