Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (master) #427

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,8 +1371,9 @@ class AbstractDCS(abc.ABC):
def __init__(self, config: Dict[str, Any], mpp: 'AbstractMPP') -> None:
"""Prepare DCS paths, MPP object, initial values for state information and processing dependencies.

:ivar config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
:param config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
:param mpp: an object implementing :class:`AbstractMPP` interface.
"""
self._mpp = mpp
self._name = config['name']
Expand Down Expand Up @@ -1719,20 +1720,22 @@ def _update_leader(self, leader: Leader) -> bool:
"""

def update_leader(self,
leader: Leader,
cluster: Cluster,
last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None,
failsafe: Optional[Dict[str, str]] = None) -> bool:
"""Update ``leader`` key (or session) ttl and optime/leader.
"""Update ``leader`` key (or session) ttl, ``/status``, and ``/failsafe`` keys.

:param leader: :class:`Leader` object with information about the leader.
:param cluster: :class:`Cluster` object with information about the current cluster state.
:param last_lsn: absolute WAL LSN in bytes.
:param slots: dictionary with permanent slots ``confirmed_flush_lsn``.
:param failsafe: if defined dictionary passed to :meth:`~AbstractDCS.write_failsafe`.

:returns: ``True`` if ``leader`` key (or session) has been updated successfully.
"""
ret = self._update_leader(leader)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(cluster.leader, Leader)
ret = self._update_leader(cluster.leader)
if ret and last_lsn:
status: Dict[str, Any] = {self._OPTIME: last_lsn}
if slots:
Expand Down
2 changes: 1 addition & 1 deletion patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any:
return bool(_run_and_handle_exceptions(self._patch_or_create, self.leader_path, annotations,
kind_resource_version, ips=ips, retry=_retry))

def update_leader(self, leader: Leader, last_lsn: Optional[int],
def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
kind = self._kinds.get(self.leader_path)
kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
Expand Down
4 changes: 1 addition & 3 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,8 @@ def update_lock(self, update_status: bool = False) -> bool:
{**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn})
except Exception:
logger.exception('Exception when called state_handler.last_operation()')
if TYPE_CHECKING: # pragma: no cover
assert self.cluster.leader is not None
try:
ret = self.dcs.update_leader(self.cluster.leader, last_lsn, slots, self._failsafe_config())
ret = self.dcs.update_leader(self.cluster, last_lsn, slots, self._failsafe_config())
except DCSError:
raise
except Exception:
Expand Down
16 changes: 8 additions & 8 deletions tests/test_consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,24 @@ def test_write_leader_optime(self):
@patch.object(consul.Consul.Session, 'renew')
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException))
def test_update_leader(self, mock_renew):
leader = self.c.get_cluster().leader
cluster = self.c.get_cluster()
self.c._session = 'fd4f44fe-2cac-bba5-a60b-304b51ff39b8'
with patch.object(consul.Consul.KV, 'delete', Mock(return_value=True)):
with patch.object(consul.Consul.KV, 'put', Mock(return_value=True)):
self.assertTrue(self.c.update_leader(leader, 12345, failsafe={'foo': 'bar'}))
self.assertTrue(self.c.update_leader(cluster, 12345, failsafe={'foo': 'bar'}))
with patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException)):
self.assertFalse(self.c.update_leader(leader, 12345))
self.assertFalse(self.c.update_leader(cluster, 12345))
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 100, 200, 300])):
self.assertRaises(ConsulError, self.c.update_leader, leader, 12345)
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345)
with patch('time.time', Mock(side_effect=[0, 100, 200, 300])):
self.assertRaises(ConsulError, self.c.update_leader, leader, 12345)
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345)
with patch.object(consul.Consul.KV, 'delete', Mock(side_effect=ConsulException)):
self.assertFalse(self.c.update_leader(leader, 12347))
self.assertFalse(self.c.update_leader(cluster, 12347))
mock_renew.side_effect = RetryFailedError('')
self.c._last_session_refresh = 0
self.assertRaises(ConsulError, self.c.update_leader, leader, 12346)
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12346)
mock_renew.side_effect = ConsulException
self.assertFalse(self.c.update_leader(leader, 12347))
self.assertFalse(self.c.update_leader(cluster, 12347))

@patch.object(consul.Consul.KV, 'delete', Mock(return_value=True))
def test_delete_leader(self):
Expand Down
12 changes: 6 additions & 6 deletions tests/test_etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ def test_write_leader_optime(self):
self.etcd.write_leader_optime('0')

def test_update_leader(self):
leader = self.etcd.get_cluster().leader
self.assertTrue(self.etcd.update_leader(leader, None, failsafe={'foo': 'bar'}))
cluster = self.etcd.get_cluster()
self.assertTrue(self.etcd.update_leader(cluster, None, failsafe={'foo': 'bar'}))
with patch.object(etcd.Client, 'write',
Mock(side_effect=[etcd.EtcdConnectionFailed, etcd.EtcdClusterIdChanged, Exception])):
self.assertRaises(EtcdError, self.etcd.update_leader, leader, None)
self.assertFalse(self.etcd.update_leader(leader, None))
self.assertRaises(EtcdError, self.etcd.update_leader, leader, None)
self.assertRaises(EtcdError, self.etcd.update_leader, cluster, None)
self.assertFalse(self.etcd.update_leader(cluster, None))
self.assertRaises(EtcdError, self.etcd.update_leader, cluster, None)
with patch.object(etcd.Client, 'write', Mock(side_effect=etcd.EtcdKeyNotFound)):
self.assertFalse(self.etcd.update_leader(leader, None))
self.assertFalse(self.etcd.update_leader(cluster, None))

def test_initialize(self):
self.assertFalse(self.etcd.initialize())
Expand Down
14 changes: 7 additions & 7 deletions tests/test_etcd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,20 @@ def test_touch_member(self):
self.etcd3.touch_member({})

def test__update_leader(self):
leader = self.etcd3.get_cluster().leader
cluster = self.etcd3.get_cluster()
self.etcd3._lease = None
with patch.object(Etcd3Client, 'txn', Mock(return_value={'succeeded': True})):
self.etcd3.update_leader(leader, '123', failsafe={'foo': 'bar'})
self.etcd3.update_leader(cluster, '123', failsafe={'foo': 'bar'})
self.etcd3._last_lease_refresh = 0
self.etcd3.update_leader(leader, '124')
self.etcd3.update_leader(cluster, '124')
with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(return_value=True)), \
patch('time.time', Mock(side_effect=[0, 100, 200, 300])):
self.assertRaises(Etcd3Error, self.etcd3.update_leader, leader, '126')
self.etcd3._lease = leader.session
self.etcd3.update_leader(leader, '124')
self.assertRaises(Etcd3Error, self.etcd3.update_leader, cluster, '126')
self.etcd3._lease = cluster.leader.session
self.etcd3.update_leader(cluster, '124')
self.etcd3._last_lease_refresh = 0
with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(side_effect=Unknown)):
self.assertFalse(self.etcd3.update_leader(leader, '125'))
self.assertFalse(self.etcd3.update_leader(cluster, '125'))

def test_take_leader(self):
self.assertFalse(self.etcd3.take_leader())
Expand Down
27 changes: 13 additions & 14 deletions tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ def setUp(self, config=None):

@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
def test_update_leader(self, mock_patch_namespaced_endpoints):
leader = self.k.get_cluster().leader
self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'}))
self.assertIsNotNone(self.k.update_leader(self.k.get_cluster(), '123', failsafe={'foo': 'bar'}))
args = mock_patch_namespaced_endpoints.call_args[0]
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '1')
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1')
Expand All @@ -399,38 +398,38 @@ def setUp(self, config=None):

@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
def test_update_leader(self, mock_patch_namespaced_endpoints):
leader = self.k.get_cluster().leader
self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'}))
cluster = self.k.get_cluster()
self.assertIsNotNone(self.k.update_leader(cluster, '123', failsafe={'foo': 'bar'}))
args = mock_patch_namespaced_endpoints.call_args[0]
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10')
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.0')
self.k._kinds._object_cache['test'].subsets[:] = []
self.assertIsNotNone(self.k.update_leader(leader, '123'))
self.assertIsNotNone(self.k.update_leader(cluster, '123'))
self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1'
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))

@patch.object(k8s_client.CoreV1Api, 'read_namespaced_endpoints', create=True)
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
def test__update_leader_with_retry(self, mock_patch, mock_read):
leader = self.k.get_cluster().leader
cluster = self.k.get_cluster()
mock_read.return_value = mock_read_namespaced_endpoints()
mock_patch.side_effect = k8s_client.rest.ApiException(502, '')
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
mock_patch.side_effect = RetryFailedError('')
self.assertRaises(KubernetesError, self.k.update_leader, leader, '123')
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
with patch('time.time', Mock(side_effect=[0, 100, 200, 0, 0, 0, 0, 100, 200])):
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()]
mock_read.return_value.metadata.resource_version = '2'
self.assertIsNotNone(self.k._update_leader_with_retry({}, '1', []))
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
mock_read.side_effect = RetryFailedError('')
self.assertRaises(KubernetesError, self.k.update_leader, leader, '123')
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
mock_read.side_effect = Exception
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))

@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints',
Mock(side_effect=[k8s_client.rest.ApiException(500, ''),
Expand Down
5 changes: 2 additions & 3 deletions tests/test_raft.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,10 @@ def test_raft(self):
cluster = raft.get_cluster()
self.assertIsInstance(cluster, Cluster)
self.assertIsInstance(cluster.workers[1], Cluster)
leader = cluster.leader
self.assertTrue(raft.delete_leader(leader))
self.assertTrue(raft.delete_leader(cluster.leader))
self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}'))
raft.get_cluster()
self.assertTrue(raft.update_leader(leader, '1', failsafe={'foo': 'bat'}))
self.assertTrue(raft.update_leader(cluster, '1', failsafe={'foo': 'bat'}))
self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}'))
self.assertTrue(raft._sync_obj.set(raft.status_path, '{'))
raft.get_mpp_coordinator()
Expand Down
12 changes: 6 additions & 6 deletions tests/test_zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ def test_take_leader(self):
self.zk.take_leader()

def test_update_leader(self):
leader = self.zk.get_cluster().leader
self.assertFalse(self.zk.update_leader(leader, 12345))
cluster = self.zk.get_cluster()
self.assertFalse(self.zk.update_leader(cluster, 12345))
with patch.object(MockKazooClient, 'delete', Mock(side_effect=RetryFailedError)):
self.assertRaises(ZooKeeperError, self.zk.update_leader, leader, 12345)
self.assertRaises(ZooKeeperError, self.zk.update_leader, cluster, 12345)
with patch.object(MockKazooClient, 'delete', Mock(side_effect=NoNodeError)):
self.assertTrue(self.zk.update_leader(leader, 12345, failsafe={'foo': 'bar'}))
self.assertTrue(self.zk.update_leader(cluster, 12345, failsafe={'foo': 'bar'}))
with patch.object(MockKazooClient, 'create', Mock(side_effect=[RetryFailedError, Exception])):
self.assertRaises(ZooKeeperError, self.zk.update_leader, leader, 12345)
self.assertFalse(self.zk.update_leader(leader, 12345))
self.assertRaises(ZooKeeperError, self.zk.update_leader, cluster, 12345)
self.assertFalse(self.zk.update_leader(cluster, 12345))

@patch.object(Cluster, 'min_version', PropertyMock(return_value=(2, 0)))
def test_write_leader_optime(self):
Expand Down
Loading