Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (feature/quorum-commit) #423

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ jobs:

- uses: jakebailey/pyright-action@v2
with:
version: 1.1.367
version: 1.1.371

docs:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion docs/dynamic_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ In order to change the dynamic configuration you can use either :ref:`patronictl
- **use\_pg\_rewind**: whether or not to use pg_rewind. Defaults to `false`. Note that either the cluster must be initialized with ``data page checksums`` (``--data-checksums`` option for ``initdb``) and/or ``wal_log_hints`` must be set to ``on``, or ``pg_rewind`` will not work.
- **use\_slots**: whether or not to use replication slots. Defaults to `true` on PostgreSQL 9.4+.
- **recovery\_conf**: additional configuration settings written to recovery.conf when configuring follower. There is no recovery.conf anymore in PostgreSQL 12, but you may continue using this section, because Patroni handles it transparently.
- **parameters**: list of configuration settings for Postgres.
- **parameters**: configuration parameters (GUCs) for Postgres in format ``{max_connections: 100, wal_level: "replica", max_wal_senders: 10, wal_log_hints: "on"}``. Many of these are required for replication to work.

- **pg\_hba**: list of lines that Patroni will use to generate ``pg_hba.conf``. Patroni ignores this parameter if ``hba_file`` PostgreSQL parameter is set to a non-default value.

Expand Down
20 changes: 20 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
Release notes
=============

Version 3.3.2
-------------

Released 2024-07-11

**Bugfixes**

- Fix plain Postgres synchronous replication mode (Israel Barth Rubio)

Since ``synchronous_mode`` was introduced to Patroni, the plain Postgres synchronous replication was not working. With this bugfix, Patroni sets the value of ``synchronous_standby_names`` as configured by the user, if that is the case, when ``synchronous_mode`` is disabled.

- Handle logical slots invalidation on a standby (Polina Bungina)

Since PG16 logical replication slots on a standby can be invalidated due to horizon: from now on, Patroni forces copy (i.e., recreation) of invalidated slots.

- Fix race condition with logical slot advance and copy (Alexander Kukushkin)

Due to this bug, it was a possible situation when an invalidated logical replication slot was copied with PostgreSQL restart more than once.


Version 3.3.1
-------------

Expand Down
2 changes: 1 addition & 1 deletion docs/yaml_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ PostgreSQL
- **pgpass**: path to the `.pgpass <https://www.postgresql.org/docs/current/static/libpq-pgpass.html>`__ password file. Patroni creates this file before executing pg\_basebackup, the post_init script and under some other circumstances. The location must be writable by Patroni.
- **recovery\_conf**: additional configuration settings written to recovery.conf when configuring follower.
- **custom\_conf** : path to an optional custom ``postgresql.conf`` file, that will be used in place of ``postgresql.base.conf``. The file must exist on all cluster nodes, be readable by PostgreSQL and will be included from its location on the real ``postgresql.conf``. Note that Patroni will not monitor this file for changes, nor backup it. However, its settings can still be overridden by Patroni's own configuration facilities - see :ref:`dynamic configuration <patroni_configuration>` for details.
- **parameters**: list of configuration settings for Postgres. Many of these are required for replication to work.
- **parameters**: configuration parameters (GUCs) for Postgres in format ``{ssl: "on", ssl_cert_file: "cert_file"}``.
- **pg\_hba**: list of lines that Patroni will use to generate ``pg_hba.conf``. Patroni ignores this parameter if ``hba_file`` PostgreSQL parameter is set to a non-default value. Together with :ref:`dynamic configuration <dynamic_configuration>` this parameter simplifies management of ``pg_hba.conf``.

- **- host all all 0.0.0.0/0 md5**
Expand Down
2 changes: 1 addition & 1 deletion patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def do_GET(self, write_status_code_only: bool = False) -> None:
cluster = patroni.dcs.cluster
config = global_config.from_cluster(cluster)

leader_optime = cluster and cluster.last_lsn or 0
leader_optime = cluster and cluster.status.last_lsn
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
max_replica_lag = parse_int(self.path_query.get('lag', [sys.maxsize])[0], 'B')
if max_replica_lag is None:
Expand Down
42 changes: 25 additions & 17 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,13 @@ def empty() -> 'Status':
"""
return Status(0, None)

def is_empty(self):
"""Validate definition of all attributes of this :class:`Status` instance.

:returns: ``True`` if all attributes of the current :class:`Status` are unpopulated.
"""
return self.last_lsn == 0 and self.slots is None

@staticmethod
def from_node(value: Union[str, Dict[str, Any], None]) -> 'Status':
"""Factory method to parse *value* as :class:`Status` object.
Expand Down Expand Up @@ -827,14 +834,13 @@ def __new__(cls, *args: Any, **kwargs: Any):
return super(Cluster, cls).__new__(cls, *args, **kwargs)

@property
def last_lsn(self) -> int:
"""Last known leader LSN."""
return self.status.last_lsn
def slots(self) -> Dict[str, int]:
"""State of permanent replication slots on the primary in the format: ``{"slot_name": int}``.

@property
def slots(self) -> Optional[Dict[str, int]]:
"""State of permanent replication slots on the primary in the format: ``{"slot_name": int}``."""
return self.status.slots
.. note::
We are trying to be foolproof here and for values that can't be parsed to :class:`int` will return ``0``.
"""
return {k: parse_int(v) or 0 for k, v in (self.status.slots or {}).items()}

@staticmethod
def empty() -> 'Cluster':
Expand All @@ -846,9 +852,9 @@ def is_empty(self):

:returns: ``True`` if all attributes of the current :class:`Cluster` are unpopulated.
"""
return all((self.initialize is None, self.config is None, self.leader is None, self.last_lsn == 0,
return all((self.initialize is None, self.config is None, self.leader is None, self.status.is_empty(),
self.members == [], self.failover is None, self.sync.version is None,
self.history is None, self.slots is None, self.failsafe is None, self.workers == {}))
self.history is None, self.failsafe is None, self.workers == {}))

def __len__(self) -> int:
"""Implement ``len`` function capability.
Expand Down Expand Up @@ -947,8 +953,9 @@ def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
"""Dictionary of permanent replication slots with their known LSN."""
ret: Dict[str, Union[Dict[str, Any], Any]] = global_config.permanent_slots

members: Dict[str, int] = {slot_name_from_member_name(m.name): m.lsn or 0 for m in self.members}
slots: Dict[str, int] = {k: parse_int(v) or 0 for k, v in (self.slots or {}).items()}
members: Dict[str, int] = {slot_name_from_member_name(m.name): m.lsn or 0
for m in self.members if m.replicatefrom}
slots: Dict[str, int] = self.slots
for name, value in list(ret.items()):
if not value:
value = ret[name] = {}
Expand Down Expand Up @@ -1613,15 +1620,16 @@ def get_cluster(self) -> Cluster:
self.reset_cluster()
raise

self._last_seen = int(time.time())
self._last_status = {self._OPTIME: cluster.last_lsn}
if cluster.slots:
self._last_status['slots'] = cluster.slots
self._last_failsafe = cluster.failsafe

with self._cluster_thread_lock:
self._cluster = cluster
self._cluster_valid_till = time.time() + self.ttl

self._last_seen = int(time.time())
self._last_status = {self._OPTIME: cluster.status.last_lsn}
if cluster.status.slots:
self._last_status['slots'] = cluster.status.slots
self._last_failsafe = cluster.failsafe

return cluster

@property
Expand Down
2 changes: 1 addition & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ def is_lagging(self, wal_position: int) -> bool:

:returns: ``True`` when node is lagging
"""
lag = (self.cluster.last_lsn or 0) - wal_position
lag = self.cluster.status.last_lsn - wal_position
return lag > global_config.maximum_lag_on_failover

def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool:
Expand Down
18 changes: 12 additions & 6 deletions patroni/postgresql/slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ def sync_slot(self, cur: Union['cursor', 'Cursor[Any]'], database: str, slot: st
except Exception as e:
logger.error("Failed to advance logical replication slot '%s': %r", slot, e)
failed = True
copy = isinstance(e, OperationalError) and e.diag.sqlstate == '58P01' # WAL file is gone
# WAL file is gone or slot is invalidated
copy = isinstance(e, OperationalError) and e.diag.sqlstate in ('58P01', '55000')
with self._condition:
if self._scheduled and failed:
if copy and slot not in self._copy_slots:
self._copy_slots.append(slot)
self._failed = True

new_lsn = self._scheduled.get(database, {}).get(slot, 0)
# remove slot from the self._scheduled structure only if it wasn't changed
if new_lsn == lsn and database in self._scheduled:
# remove slot from the self._scheduled structure if it is to be copied or if it wasn't changed
if copy or (new_lsn == lsn and database in self._scheduled):
self._scheduled[database].pop(slot)
if not self._scheduled[database]:
self._scheduled.pop(database)
Expand Down Expand Up @@ -151,15 +152,18 @@ def schedule(self, advance_slots: Dict[str, Dict[str, int]]) -> Tuple[bool, List
"""
with self._condition:
for database, values in advance_slots.items():
self._scheduled[database].update(values)
for name, value in values.items():
# Don't schedule sync for slots that just failed to be advanced and scheduled to be copied
if name not in self._copy_slots:
self._scheduled[database][name] = value
ret = (self._failed, self._copy_slots)
self._copy_slots = []
self._failed = False
self._condition.notify()

return ret

def on_promote(self) -> None:
def clean(self) -> None:
"""Reset state of the daemon."""
with self._condition:
self._scheduled.clear()
Expand Down Expand Up @@ -674,6 +678,8 @@ def copy_logical_slots(self, cluster: Cluster, tags: Tags, create_slots: List[st
logger.error("Failed to copy logical slots from the %s via postgresql connection: %r", leader.name, e)

if copy_slots and self._postgresql.stop():
if self._advance:
self._advance.clean()
pg_perm.set_permissions_from_data_directory(self._postgresql.data_dir)
for name, value in copy_slots.items():
slot_dir = os.path.join(self.pg_replslot_dir, name)
Expand Down Expand Up @@ -717,7 +723,7 @@ def on_promote(self) -> None:

"""
if self._advance:
self._advance.on_promote()
self._advance.clean()

if self._logical_slots_processing_queue:
logger.warning('Logical replication slots that might be unsafe to use after promote: %s',
Expand Down
2 changes: 1 addition & 1 deletion patroni/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:

config = global_config.from_cluster(cluster)
leader_name = cluster.leader.name if cluster.leader else None
cluster_lsn = cluster.last_lsn or 0
cluster_lsn = cluster.status.last_lsn

ret: Dict[str, Any] = {'members': []}
sync_role = 'quorum_standby' if config.is_quorum_commit_mode else 'sync_standby'
Expand Down
2 changes: 1 addition & 1 deletion patroni/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

:var __version__: the current Patroni version.
"""
__version__ = '3.3.1'
__version__ = '3.3.2'
2 changes: 1 addition & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class TestRestApiHandler(unittest.TestCase):

def test_do_GET(self):
MockPostgresql.pending_restart_reason = {'max_connections': get_param_diff('200', '100')}
MockPatroni.dcs.cluster.last_lsn = 20
MockPatroni.dcs.cluster.status.last_lsn = 20
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /replica')
MockRestApiServer(RestApiHandler, 'GET /replica?lag=1M')
Expand Down
18 changes: 12 additions & 6 deletions tests/test_slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_should_enforce_hot_standby_feedback(self):
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
def test__ensure_logical_slots_replica(self):
self.p.set_role('replica')
self.cluster.slots['ls'] = 12346
self.cluster.status.slots['ls'] = 12346
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock(return_value=False)):
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 499, 'b', 'a', 5, 100, 500)])), \
Expand All @@ -222,10 +222,10 @@ def test__ensure_logical_slots_replica(self):
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
self.cluster.slots['ls'] = 'a'
self.cluster.status.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
self.cluster.config.data['slots']['ls']['database'] = 'b'
self.cluster.slots['ls'] = '500'
self.cluster.status.slots['ls'] = '500'
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])

Expand Down Expand Up @@ -269,9 +269,15 @@ def test_fsync_dir(self):
def test_slots_advance_thread(self):
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
self.s.schedule_advance_slots({'foo': {'bar': 100}})
self.s._advance.sync_slots()
for err in ('58P01', '55000'):
type(mock_diag).sqlstate = PropertyMock(return_value=err)
self.s.schedule_advance_slots({'foo': {'bar': 100}})
self.s._advance.sync_slots()
self.assertEqual(self.s._advance._copy_slots, ["bar"])
# we don't want to make attempts to advance slots that are to be copied
self.s.schedule_advance_slots({'foo': {'bar': 101}})
self.assertEqual(self.s._advance._scheduled, {})
self.s._advance.clean()

with patch.object(SlotsAdvanceThread, 'sync_slots', Mock(side_effect=Exception)):
self.s._advance._condition.wait = Mock()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test__cluster_loader(self):

def test_get_cluster(self):
cluster = self.zk.get_cluster()
self.assertEqual(cluster.last_lsn, 500)
self.assertEqual(cluster.status.last_lsn, 500)

def test__get_citus_cluster(self):
self.zk._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
Expand Down
Loading