Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (master) #421

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/dynamic_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ In order to change the dynamic configuration you can use either :ref:`patronictl
- **use\_pg\_rewind**: whether or not to use pg_rewind. Defaults to `false`. Note that either the cluster must be initialized with ``data page checksums`` (``--data-checksums`` option for ``initdb``) and/or ``wal_log_hints`` must be set to ``on``, or ``pg_rewind`` will not work.
- **use\_slots**: whether or not to use replication slots. Defaults to `true` on PostgreSQL 9.4+.
- **recovery\_conf**: additional configuration settings written to recovery.conf when configuring follower. There is no recovery.conf anymore in PostgreSQL 12, but you may continue using this section, because Patroni handles it transparently.
- **parameters**: list of configuration settings for Postgres.
- **parameters**: configuration parameters (GUCs) for Postgres in format ``{max_connections: 100, wal_level: "replica", max_wal_senders: 10, wal_log_hints: "on"}``. Many of these are required for replication to work.

- **pg\_hba**: list of lines that Patroni will use to generate ``pg_hba.conf``. Patroni ignores this parameter if ``hba_file`` PostgreSQL parameter is set to a non-default value.

Expand Down
2 changes: 1 addition & 1 deletion docs/yaml_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ PostgreSQL
- **pgpass**: path to the `.pgpass <https://www.postgresql.org/docs/current/static/libpq-pgpass.html>`__ password file. Patroni creates this file before executing pg\_basebackup, the post_init script and under some other circumstances. The location must be writable by Patroni.
- **recovery\_conf**: additional configuration settings written to recovery.conf when configuring follower.
- **custom\_conf** : path to an optional custom ``postgresql.conf`` file, that will be used in place of ``postgresql.base.conf``. The file must exist on all cluster nodes, be readable by PostgreSQL and will be included from its location on the real ``postgresql.conf``. Note that Patroni will not monitor this file for changes, nor backup it. However, its settings can still be overridden by Patroni's own configuration facilities - see :ref:`dynamic configuration <patroni_configuration>` for details.
- **parameters**: list of configuration settings for Postgres. Many of these are required for replication to work.
- **parameters**: configuration parameters (GUCs) for Postgres in format ``{ssl: "on", ssl_cert_file: "cert_file"}``.
- **pg\_hba**: list of lines that Patroni will use to generate ``pg_hba.conf``. Patroni ignores this parameter if ``hba_file`` PostgreSQL parameter is set to a non-default value. Together with :ref:`dynamic configuration <dynamic_configuration>` this parameter simplifies management of ``pg_hba.conf``.

- **- host all all 0.0.0.0/0 md5**
Expand Down
18 changes: 12 additions & 6 deletions patroni/postgresql/slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ def sync_slot(self, cur: Union['cursor', 'Cursor[Any]'], database: str, slot: st
except Exception as e:
logger.error("Failed to advance logical replication slot '%s': %r", slot, e)
failed = True
copy = isinstance(e, OperationalError) and e.diag.sqlstate == '58P01' # WAL file is gone
# WAL file is gone or slot is invalidated
copy = isinstance(e, OperationalError) and e.diag.sqlstate in ('58P01', '55000')
with self._condition:
if self._scheduled and failed:
if copy and slot not in self._copy_slots:
self._copy_slots.append(slot)
self._failed = True

new_lsn = self._scheduled.get(database, {}).get(slot, 0)
# remove slot from the self._scheduled structure only if it wasn't changed
if new_lsn == lsn and database in self._scheduled:
# remove slot from the self._scheduled structure if it is to be copied or if it wasn't changed
if copy or (new_lsn == lsn and database in self._scheduled):
self._scheduled[database].pop(slot)
if not self._scheduled[database]:
self._scheduled.pop(database)
Expand Down Expand Up @@ -151,15 +152,18 @@ def schedule(self, advance_slots: Dict[str, Dict[str, int]]) -> Tuple[bool, List
"""
with self._condition:
for database, values in advance_slots.items():
self._scheduled[database].update(values)
for name, value in values.items():
# Don't schedule sync for slots that just failed to be advanced and scheduled to be copied
if name not in self._copy_slots:
self._scheduled[database][name] = value
ret = (self._failed, self._copy_slots)
self._copy_slots = []
self._failed = False
self._condition.notify()

return ret

def on_promote(self) -> None:
def clean(self) -> None:
"""Reset state of the daemon."""
with self._condition:
self._scheduled.clear()
Expand Down Expand Up @@ -674,6 +678,8 @@ def copy_logical_slots(self, cluster: Cluster, tags: Tags, create_slots: List[st
logger.error("Failed to copy logical slots from the %s via postgresql connection: %r", leader.name, e)

if copy_slots and self._postgresql.stop():
if self._advance:
self._advance.clean()
pg_perm.set_permissions_from_data_directory(self._postgresql.data_dir)
for name, value in copy_slots.items():
slot_dir = os.path.join(self.pg_replslot_dir, name)
Expand Down Expand Up @@ -717,7 +723,7 @@ def on_promote(self) -> None:

"""
if self._advance:
self._advance.on_promote()
self._advance.clean()

if self._logical_slots_processing_queue:
logger.warning('Logical replication slots that might be unsafe to use after promote: %s',
Expand Down
12 changes: 9 additions & 3 deletions tests/test_slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,15 @@ def test_fsync_dir(self):
def test_slots_advance_thread(self):
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
self.s.schedule_advance_slots({'foo': {'bar': 100}})
self.s._advance.sync_slots()
for err in ('58P01', '55000'):
type(mock_diag).sqlstate = PropertyMock(return_value=err)
self.s.schedule_advance_slots({'foo': {'bar': 100}})
self.s._advance.sync_slots()
self.assertEqual(self.s._advance._copy_slots, ["bar"])
# we don't want to make attempts to advance slots that are to be copied
self.s.schedule_advance_slots({'foo': {'bar': 101}})
self.assertEqual(self.s._advance._scheduled, {})
self.s._advance.clean()

with patch.object(SlotsAdvanceThread, 'sync_slots', Mock(side_effect=Exception)):
self.s._advance._condition.wait = Mock()
Expand Down
Loading