Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (master) #420

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,10 @@ def process_sync_replication(self) -> None:
then that node must have synchronous_standby set to that value. Or more simple, first set in postgresql.conf
and then in DCS. When removing, first remove in DCS, then in postgresql.conf. This is so we only consider
promoting standbys that were guaranteed to be replicating synchronously.

.. note::
If ``synchronous_mode`` is disabled, we fall back to using the value configured by the user for
``synchronous_standby_names``, if any.
"""
if self.is_synchronous_mode():
sync = self.cluster.sync
Expand Down Expand Up @@ -760,9 +764,14 @@ def process_sync_replication(self) -> None:
return logger.info("Synchronous replication key updated by someone else")
logger.info("Synchronous standby status assigned to %s", list(allow_promote))
else:
# If synchronous_mode was turned off, we need to update synchronous_standby_names in Postgres
if not self.cluster.sync.is_empty and self.dcs.delete_sync_state(version=self.cluster.sync.version):
logger.info("Disabled synchronous replication")
self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet())
self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet())

# As synchronous_mode is off, check if the user configured Postgres synchronous replication instead
ssn = self.state_handler.config.synchronous_standby_names
self.state_handler.config.set_synchronous_standby_names(ssn)

def is_sync_standby(self, cluster: Cluster) -> bool:
""":returns: `True` if the current node is a synchronous standby."""
Expand Down
9 changes: 9 additions & 0 deletions patroni/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1344,3 +1344,12 @@ def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:

def restore_command(self) -> Optional[str]:
return (self.get('recovery_conf') or EMPTY_DICT).get('restore_command')

@property
def synchronous_standby_names(self) -> Optional[str]:
"""Get ``synchronous_standby_names`` value configured by the user.

:returns: value of ``synchronous_standby_names`` in the Patroni configuration,
if any, otherwise ``None``.
"""
return (self.get('parameters') or EMPTY_DICT).get('synchronous_standby_names')
26 changes: 25 additions & 1 deletion tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ def test_demote_immediate(self, follow):
def test_process_sync_replication(self):
self.ha.has_lock = true
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
mock_cfg_set_sync = self.p.config.set_synchronous_standby_names = Mock()
self.p.name = 'leader'

# Test sync key removed when sync mode disabled
Expand All @@ -1314,16 +1315,20 @@ def test_process_sync_replication(self):
self.ha.run_cycle()
mock_delete_sync.assert_called_once()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet())
mock_cfg_set_sync.assert_called_once()

mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync key not touched when not there
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_not_called()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet())
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_called_once()

mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()

self.ha.is_synchronous_mode = true

Expand All @@ -1335,12 +1340,14 @@ def test_process_sync_replication(self):
mock_set_sync.assert_not_called()

mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()

# Test sync standby is replaced when switching standbys
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2']), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2']))
mock_cfg_set_sync.assert_not_called()

# Test sync standby is replaced when new standby is joined
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2', 'other3']),
Expand All @@ -1349,14 +1356,18 @@ def test_process_sync_replication(self):
self.ha.run_cycle()
self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),))
self.assertEqual(mock_set_sync.call_args_list[1][0], (CaseInsensitiveSet(['other2', 'other3']),))
mock_cfg_set_sync.assert_not_called()

mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync standby is not disabled when updating dcs fails
self.ha.dcs.write_sync_state = Mock(return_value=None)
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_not_called()

mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test changing sync standby
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
Expand Down Expand Up @@ -1384,10 +1395,23 @@ def test_process_sync_replication(self):

# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.cluster.config.data['synchronous_mode_strict'] = True
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
mock_cfg_set_sync.assert_not_called()

# Test the value configured by the user for synchronous_standby_names is used when synchronous mode is disabled
self.ha.is_synchronous_mode = false

mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
ssn_mock = PropertyMock(return_value="SOME_SSN")
with patch('patroni.postgresql.config.ConfigHandler.synchronous_standby_names', ssn_mock):
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_called_once_with("SOME_SSN")

def test_sync_replication_become_primary(self):
self.ha.is_synchronous_mode = true
Expand Down
Loading