Skip to content

Commit

Permalink
Merge pull request #425 from patroni/master
Browse files Browse the repository at this point in the history
Syncing from upstream patroni/patroni (master)
  • Loading branch information
bt-admin authored Jul 18, 2024
2 parents 96734b3 + b1d442e commit 17b5a87
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 51 deletions.
6 changes: 4 additions & 2 deletions features/dcs_failsafe_mode.feature
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Feature: dcs failsafe mode
@dcs-failsafe
Scenario: scale to three-node cluster
Given I start postgres0
And I start postgres2
And I configure and start postgres2 with a tag replicatefrom postgres0
Then "members/postgres2" key in DCS has state=running after 10 seconds
And "members/postgres0" key in DCS has state=running after 20 seconds
And Response on GET http://127.0.0.1:8008/failsafe contains postgres2 after 10 seconds
Expand All @@ -86,13 +86,14 @@ Feature: dcs failsafe mode
@dcs-failsafe
@slot-advance
Scenario: make sure permanent slots exist on replicas
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}}
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"postgres2":0,"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}}
Then logical slot dcs_slot_2 is in sync between postgres1 and postgres0 after 20 seconds
And logical slot dcs_slot_2 is in sync between postgres1 and postgres2 after 20 seconds
When I get all changes from physical slot dcs_slot_1 on postgres1
Then physical slot dcs_slot_1 is in sync between postgres1 and postgres0 after 10 seconds
And physical slot dcs_slot_1 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres0 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres2 is in sync between postgres0 and postgres1 after 10 seconds

@dcs-failsafe
Scenario: check three-node cluster is functioning while DCS is down
Expand All @@ -114,3 +115,4 @@ Feature: dcs failsafe mode
And physical slot dcs_slot_1 is in sync between postgres1 and postgres0 after 10 seconds
And physical slot dcs_slot_1 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres0 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres2 is in sync between postgres0 and postgres1 after 10 seconds
8 changes: 5 additions & 3 deletions patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,17 +752,19 @@ def do_POST_failsafe(self) -> None:
"""Handle a ``POST`` request to ``/failsafe`` path.
Writes a response with HTTP status ``200`` if this node is a Standby, or with HTTP status ``500`` if this is
the primary.
the primary. In addition to that it returns absolute value of received/replayed LSN in the ``lsn`` header.
.. note::
If ``failsafe_mode`` is not enabled, then write a response with HTTP status ``502``.
"""
if self.server.patroni.ha.is_failsafe_mode():
request = self._read_json_content()
if request:
message = self.server.patroni.ha.update_failsafe(request) or 'Accepted'
ret = self.server.patroni.ha.update_failsafe(request)
headers = {'lsn': str(ret)} if isinstance(ret, int) else {}
message = ret if isinstance(ret, str) else 'Accepted'
code = 200 if message == 'Accepted' else 500
self.write_response(code, message)
self.write_response(code, message, headers=headers)
else:
self.send_error(502)

Expand Down
27 changes: 13 additions & 14 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from ..postgresql import Postgresql
from ..postgresql.mpp import AbstractMPP

SLOT_ADVANCE_AVAILABLE_VERSION = 110000
slot_name_re = re.compile('^[a-z0-9_]{1,63}$')
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -947,9 +946,9 @@ def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
if not value:
value = ret[name] = {}
if isinstance(value, dict):
# for permanent physical slots we want to get MAX LSN from the `Cluster.slots` and from the
# member with the matching name. It is necessary because we may have the replication slot on
# the primary that is streaming from the other standby node using the `replicatefrom` tag.
# For permanent physical slots we want to get MAX LSN from the `Cluster.slots` and from the
# member that does cascading replication with the matching name (see `replicatefrom` tag).
# It is necessary because we may have the permanent replication slot on the primary for this node.
lsn = max(members.get(name, 0) if self.is_physical_slot(value) else 0, slots.get(name, 0))
if lsn:
value['lsn'] = lsn
Expand Down Expand Up @@ -990,7 +989,7 @@ def get_replication_slots(self, postgresql: 'Postgresql', member: Tags, *,
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)

disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
slots, permanent_slots, name, postgresql.major_version)
slots, permanent_slots, name, postgresql.can_advance_slots)

if disabled_permanent_logical_slots and show_error:
logger.error("Permanent logical replication slots supported by Patroni only starting from PostgreSQL 11. "
Expand All @@ -999,7 +998,7 @@ def get_replication_slots(self, postgresql: 'Postgresql', member: Tags, *,
return slots

def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slots: Dict[str, Any], name: str,
major_version: int) -> List[str]:
can_advance_slots: bool) -> List[str]:
"""Merge replication *slots* for members with *permanent_slots*.
Perform validation of configured permanent slot name, skipping invalid names.
Expand All @@ -1010,7 +1009,8 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
:param slots: Slot names with existing attributes if known.
:param name: name of this node.
:param permanent_slots: dictionary containing slot name key and slot information values.
:param major_version: postgresql major version.
:param can_advance_slots: ``True`` if ``pg_replication_slot_advance()`` function is available,
``False`` otherwise.
:returns: List of disabled permanent, logical slot names, if postgresql version < 11.
"""
Expand All @@ -1034,7 +1034,7 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
continue

if self.is_logical_slot(value):
if major_version < SLOT_ADVANCE_AVAILABLE_VERSION:
if not can_advance_slots:
disabled_permanent_logical_slots.append(slot_name)
elif slot_name in slots:
logger.error("Permanent logical replication slot {'%s': %s} is conflicting with"
Expand Down Expand Up @@ -1070,11 +1070,10 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str)
return {}

if global_config.is_standby_cluster or self.get_slot_name_on_primary(postgresql.name, tags) is None:
return self.__permanent_physical_slots \
if postgresql.major_version >= SLOT_ADVANCE_AVAILABLE_VERSION or role == 'standby_leader' else {}
return self.__permanent_physical_slots if postgresql.can_advance_slots or role == 'standby_leader' else {}

return self.__permanent_slots if postgresql.major_version >= SLOT_ADVANCE_AVAILABLE_VERSION\
or role in ('master', 'primary') else self.__permanent_logical_slots
return self.__permanent_slots if postgresql.can_advance_slots or role in ('master', 'primary') \
else self.__permanent_logical_slots

def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]:
"""Get physical replication slots configuration for members that sourcing from this node.
Expand Down Expand Up @@ -1137,7 +1136,7 @@ def has_permanent_slots(self, postgresql: 'Postgresql', member: Tags) -> bool:
members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(postgresql.name, role)
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)
slots = deepcopy(members_slots)
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.major_version)
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.can_advance_slots)
return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values())

def filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]:
Expand All @@ -1148,7 +1147,7 @@ def filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]
:returns: a :class:`dict` object that contains only slots that are known to be permanent.
"""
if postgresql.major_version < SLOT_ADVANCE_AVAILABLE_VERSION:
if not postgresql.can_advance_slots:
return {} # for legacy PostgreSQL we don't support permanent slots on standby nodes

permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, RemoteMember('', {}), 'replica')
Expand Down
Loading

0 comments on commit 17b5a87

Please sign in to comment.