Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (feature/quorum-commit) #430

Merged
1 change: 0 additions & 1 deletion docs/patroni_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ There are some other Postgres parameters controlled by Patroni:
- **port** - is set either from ``postgresql.listen`` or from ``PATRONI_POSTGRESQL_LISTEN`` environment variable
- **cluster_name** - is set either from ``scope`` or from ``PATRONI_SCOPE`` environment variable
- **hot_standby: on**
- **wal_log_hints: on** - for Postgres 9.4 and newer.

To be on the safe side parameters from the above lists are not written into ``postgresql.conf``, but passed as a list of arguments to the ``pg_ctl start`` which gives them the highest precedence, even above `ALTER SYSTEM <https://www.postgresql.org/docs/current/static/sql-altersystem.html>`__

Expand Down
6 changes: 4 additions & 2 deletions features/dcs_failsafe_mode.feature
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Feature: dcs failsafe mode
@dcs-failsafe
Scenario: scale to three-node cluster
Given I start postgres0
And I start postgres2
And I configure and start postgres2 with a tag replicatefrom postgres0
Then "members/postgres2" key in DCS has state=running after 10 seconds
And "members/postgres0" key in DCS has state=running after 20 seconds
And Response on GET http://127.0.0.1:8008/failsafe contains postgres2 after 10 seconds
Expand All @@ -86,13 +86,14 @@ Feature: dcs failsafe mode
@dcs-failsafe
@slot-advance
Scenario: make sure permanent slots exist on replicas
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}}
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"postgres2":0,"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}}
Then logical slot dcs_slot_2 is in sync between postgres1 and postgres0 after 20 seconds
And logical slot dcs_slot_2 is in sync between postgres1 and postgres2 after 20 seconds
When I get all changes from physical slot dcs_slot_1 on postgres1
Then physical slot dcs_slot_1 is in sync between postgres1 and postgres0 after 10 seconds
And physical slot dcs_slot_1 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres0 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres2 is in sync between postgres0 and postgres1 after 10 seconds

@dcs-failsafe
Scenario: check three-node cluster is functioning while DCS is down
Expand All @@ -114,3 +115,4 @@ Feature: dcs failsafe mode
And physical slot dcs_slot_1 is in sync between postgres1 and postgres0 after 10 seconds
And physical slot dcs_slot_1 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres0 is in sync between postgres1 and postgres2 after 10 seconds
And physical slot postgres2 is in sync between postgres0 and postgres1 after 10 seconds
47 changes: 33 additions & 14 deletions patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
logger = logging.getLogger(__name__)


def check_access(func: Callable[..., None]) -> Callable[..., None]:
def check_access(*args: Any, **kwargs: Any) -> Callable[..., Any]:
"""Check the source ip, authorization header, or client certificates.

.. note::
The actual logic to check access is implemented through :func:`RestApiServer.check_access`.

:param func: function to be decorated.
Optionally it is possible to skip source ip check by specifying ``allowlist_check_members=False``.

:returns: a decorator that executes *func* only if :func:`RestApiServer.check_access` returns ``True``.

Expand All @@ -60,19 +60,31 @@ def check_access(func: Callable[..., None]) -> Callable[..., None]:
... @check_access
... def do_PUT_foo(self):
... print('In do_PUT_foo')
... @check_access(allowlist_check_members=False)
... def do_POST_bar(self):
... print('In do_POST_bar')

>>> f = Foo()
>>> f.do_PUT_foo()
In FooServer: Foo
In do_PUT_foo

"""
allowlist_check_members = kwargs.get('allowlist_check_members', True)

def inner_decorator(func: Callable[..., None]) -> Callable[..., None]:
def wrapper(self: 'RestApiHandler', *args: Any, **kwargs: Any) -> None:
if self.server.check_access(self, allowlist_check_members=allowlist_check_members):
return func(self, *args, **kwargs)

def wrapper(self: 'RestApiHandler', *args: Any, **kwargs: Any) -> None:
if self.server.check_access(self):
return func(self, *args, **kwargs)
return wrapper

return wrapper
# A hacky way to have decorators that work with and without parameters.
if len(args) == 1 and callable(args[0]):
# The first parameter is a function, it means decorator is used as "@check_access"
return inner_decorator(args[0])
else:
# @check_access(allowlist_check_members=False) case
return inner_decorator


class RestApiHandler(BaseHTTPRequestHandler):
Expand Down Expand Up @@ -768,22 +780,24 @@ def do_GET_failsafe(self) -> None:
else:
self.send_error(502)

@check_access
@check_access(allowlist_check_members=False)
def do_POST_failsafe(self) -> None:
"""Handle a ``POST`` request to ``/failsafe`` path.

Writes a response with HTTP status ``200`` if this node is a Standby, or with HTTP status ``500`` if this is
the primary.
the primary. In addition to that it returns absolute value of received/replayed LSN in the ``lsn`` header.

.. note::
If ``failsafe_mode`` is not enabled, then write a response with HTTP status ``502``.
"""
if self.server.patroni.ha.is_failsafe_mode():
request = self._read_json_content()
if request:
message = self.server.patroni.ha.update_failsafe(request) or 'Accepted'
ret = self.server.patroni.ha.update_failsafe(request)
headers = {'lsn': str(ret)} if isinstance(ret, int) else {}
message = ret if isinstance(ret, str) else 'Accepted'
code = 200 if message == 'Accepted' else 500
self.write_response(code, message)
self.write_response(code, message, headers=headers)
else:
self.send_error(502)

Expand Down Expand Up @@ -1522,7 +1536,7 @@ def __members_ips(self) -> Iterator[Union[IPv4Network, IPv6Network]]:
except Exception as e:
logger.debug('Failed to parse url %s: %r', member.api_url, e)

def check_access(self, rh: RestApiHandler) -> Optional[bool]:
def check_access(self, rh: RestApiHandler, allowlist_check_members: bool = True) -> Optional[bool]:
"""Ensure client has enough privileges to perform a given request.

Write a response back to the client if any issue is observed, and the HTTP status may be:
Expand All @@ -1535,12 +1549,17 @@ def check_access(self, rh: RestApiHandler) -> Optional[bool]:
* a client certificate is expected by the server, but is missing in the request.

:param rh: the request which access should be checked.
:param allowlist_check_members: whether we should check the source ip against existing cluster members.

:returns: ``True`` if client access verification succeeded, otherwise ``None``.
"""
if self.__allowlist or self.__allowlist_include_members:
allowlist_check_members = allowlist_check_members and bool(self.__allowlist_include_members)
if self.__allowlist or allowlist_check_members:
incoming_ip = ip_address(rh.client_address[0])
if not any(incoming_ip in net for net in self.__allowlist + tuple(self.__members_ips())):

members_ips = tuple(self.__members_ips()) if allowlist_check_members else ()

if not any(incoming_ip in net for net in self.__allowlist + members_ips):
return rh.write_response(403, 'Access is denied')

if not hasattr(rh.request, 'getpeercert') or not rh.request.getpeercert(): # valid client cert isn't present
Expand Down
42 changes: 22 additions & 20 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from ..postgresql import Postgresql
from ..postgresql.mpp import AbstractMPP

SLOT_ADVANCE_AVAILABLE_VERSION = 110000
slot_name_re = re.compile('^[a-z0-9_]{1,63}$')
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -960,9 +959,9 @@ def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
if not value:
value = ret[name] = {}
if isinstance(value, dict):
# for permanent physical slots we want to get MAX LSN from the `Cluster.slots` and from the
# member with the matching name. It is necessary because we may have the replication slot on
# the primary that is streaming from the other standby node using the `replicatefrom` tag.
# For permanent physical slots we want to get MAX LSN from the `Cluster.slots` and from the
# member that does cascading replication with the matching name (see `replicatefrom` tag).
# It is necessary because we may have the permanent replication slot on the primary for this node.
lsn = max(members.get(name, 0) if self.is_physical_slot(value) else 0, slots.get(name, 0))
if lsn:
value['lsn'] = lsn
Expand Down Expand Up @@ -1003,7 +1002,7 @@ def get_replication_slots(self, postgresql: 'Postgresql', member: Tags, *,
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)

disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
slots, permanent_slots, name, postgresql.major_version)
slots, permanent_slots, name, postgresql.can_advance_slots)

if disabled_permanent_logical_slots and show_error:
logger.error("Permanent logical replication slots supported by Patroni only starting from PostgreSQL 11. "
Expand All @@ -1012,7 +1011,7 @@ def get_replication_slots(self, postgresql: 'Postgresql', member: Tags, *,
return slots

def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slots: Dict[str, Any], name: str,
major_version: int) -> List[str]:
can_advance_slots: bool) -> List[str]:
"""Merge replication *slots* for members with *permanent_slots*.

Perform validation of configured permanent slot name, skipping invalid names.
Expand All @@ -1023,7 +1022,8 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
:param slots: Slot names with existing attributes if known.
:param name: name of this node.
:param permanent_slots: dictionary containing slot name key and slot information values.
:param major_version: postgresql major version.
:param can_advance_slots: ``True`` if ``pg_replication_slot_advance()`` function is available,
``False`` otherwise.

:returns: List of disabled permanent, logical slot names, if postgresql version < 11.
"""
Expand All @@ -1047,7 +1047,7 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
continue

if self.is_logical_slot(value):
if major_version < SLOT_ADVANCE_AVAILABLE_VERSION:
if not can_advance_slots:
disabled_permanent_logical_slots.append(slot_name)
elif slot_name in slots:
logger.error("Permanent logical replication slot {'%s': %s} is conflicting with"
Expand Down Expand Up @@ -1083,11 +1083,10 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str)
return {}

if global_config.is_standby_cluster or self.get_slot_name_on_primary(postgresql.name, tags) is None:
return self.__permanent_physical_slots \
if postgresql.major_version >= SLOT_ADVANCE_AVAILABLE_VERSION or role == 'standby_leader' else {}
return self.__permanent_physical_slots if postgresql.can_advance_slots or role == 'standby_leader' else {}

return self.__permanent_slots if postgresql.major_version >= SLOT_ADVANCE_AVAILABLE_VERSION\
or role in ('master', 'primary') else self.__permanent_logical_slots
return self.__permanent_slots if postgresql.can_advance_slots or role in ('master', 'primary') \
else self.__permanent_logical_slots

def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]:
"""Get physical replication slots configuration for members that sourcing from this node.
Expand Down Expand Up @@ -1150,7 +1149,7 @@ def has_permanent_slots(self, postgresql: 'Postgresql', member: Tags) -> bool:
members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(postgresql.name, role)
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)
slots = deepcopy(members_slots)
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.major_version)
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.can_advance_slots)
return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values())

def filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]:
Expand All @@ -1161,7 +1160,7 @@ def filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]

:returns: a :class:`dict` object that contains only slots that are known to be permanent.
"""
if postgresql.major_version < SLOT_ADVANCE_AVAILABLE_VERSION:
if not postgresql.can_advance_slots:
return {} # for legacy PostgreSQL we don't support permanent slots on standby nodes

permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, RemoteMember('', {}), 'replica')
Expand Down Expand Up @@ -1385,8 +1384,9 @@ class AbstractDCS(abc.ABC):
def __init__(self, config: Dict[str, Any], mpp: 'AbstractMPP') -> None:
"""Prepare DCS paths, MPP object, initial values for state information and processing dependencies.

:ivar config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
:param config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
:param mpp: an object implementing :class:`AbstractMPP` interface.
"""
self._mpp = mpp
self._name = config['name']
Expand Down Expand Up @@ -1733,20 +1733,22 @@ def _update_leader(self, leader: Leader) -> bool:
"""

def update_leader(self,
leader: Leader,
cluster: Cluster,
last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None,
failsafe: Optional[Dict[str, str]] = None) -> bool:
"""Update ``leader`` key (or session) ttl and optime/leader.
"""Update ``leader`` key (or session) ttl, ``/status``, and ``/failsafe`` keys.

:param leader: :class:`Leader` object with information about the leader.
:param cluster: :class:`Cluster` object with information about the current cluster state.
:param last_lsn: absolute WAL LSN in bytes.
:param slots: dictionary with permanent slots ``confirmed_flush_lsn``.
:param failsafe: if defined dictionary passed to :meth:`~AbstractDCS.write_failsafe`.

:returns: ``True`` if ``leader`` key (or session) has been updated successfully.
"""
ret = self._update_leader(leader)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(cluster.leader, Leader)
ret = self._update_leader(cluster.leader)
if ret and last_lsn:
status: Dict[str, Any] = {self._OPTIME: last_lsn}
if slots:
Expand Down
2 changes: 1 addition & 1 deletion patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any:
return bool(_run_and_handle_exceptions(self._patch_or_create, self.leader_path, annotations,
kind_resource_version, ips=ips, retry=_retry))

def update_leader(self, leader: Leader, last_lsn: Optional[int],
def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
kind = self._kinds.get(self.leader_path)
kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
Expand Down
Loading
Loading