Skip to content

Commit

Permalink
Merge pull request #427 from patroni/master
Browse files Browse the repository at this point in the history
Syncing from upstream patroni/patroni (master)
  • Loading branch information
bt-admin authored Jul 19, 2024
2 parents 17b5a87 + c633923 commit c15372b
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 54 deletions.
15 changes: 9 additions & 6 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,8 +1371,9 @@ class AbstractDCS(abc.ABC):
def __init__(self, config: Dict[str, Any], mpp: 'AbstractMPP') -> None:
"""Prepare DCS paths, MPP object, initial values for state information and processing dependencies.
:ivar config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
:param config: :class:`dict`, reference to config section of selected DCS.
i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc...
:param mpp: an object implementing :class:`AbstractMPP` interface.
"""
self._mpp = mpp
self._name = config['name']
Expand Down Expand Up @@ -1719,20 +1720,22 @@ def _update_leader(self, leader: Leader) -> bool:
"""

def update_leader(self,
leader: Leader,
cluster: Cluster,
last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None,
failsafe: Optional[Dict[str, str]] = None) -> bool:
"""Update ``leader`` key (or session) ttl and optime/leader.
"""Update ``leader`` key (or session) ttl, ``/status``, and ``/failsafe`` keys.
:param leader: :class:`Leader` object with information about the leader.
:param cluster: :class:`Cluster` object with information about the current cluster state.
:param last_lsn: absolute WAL LSN in bytes.
:param slots: dictionary with permanent slots ``confirmed_flush_lsn``.
:param failsafe: if defined dictionary passed to :meth:`~AbstractDCS.write_failsafe`.
:returns: ``True`` if ``leader`` key (or session) has been updated successfully.
"""
ret = self._update_leader(leader)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(cluster.leader, Leader)
ret = self._update_leader(cluster.leader)
if ret and last_lsn:
status: Dict[str, Any] = {self._OPTIME: last_lsn}
if slots:
Expand Down
2 changes: 1 addition & 1 deletion patroni/dcs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any:
return bool(_run_and_handle_exceptions(self._patch_or_create, self.leader_path, annotations,
kind_resource_version, ips=ips, retry=_retry))

def update_leader(self, leader: Leader, last_lsn: Optional[int],
def update_leader(self, cluster: Cluster, last_lsn: Optional[int],
slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
kind = self._kinds.get(self.leader_path)
kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
Expand Down
4 changes: 1 addition & 3 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,8 @@ def update_lock(self, update_status: bool = False) -> bool:
{**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn})
except Exception:
logger.exception('Exception when called state_handler.last_operation()')
if TYPE_CHECKING: # pragma: no cover
assert self.cluster.leader is not None
try:
ret = self.dcs.update_leader(self.cluster.leader, last_lsn, slots, self._failsafe_config())
ret = self.dcs.update_leader(self.cluster, last_lsn, slots, self._failsafe_config())
except DCSError:
raise
except Exception:
Expand Down
16 changes: 8 additions & 8 deletions tests/test_consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,24 @@ def test_write_leader_optime(self):
@patch.object(consul.Consul.Session, 'renew')
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException))
def test_update_leader(self, mock_renew):
leader = self.c.get_cluster().leader
cluster = self.c.get_cluster()
self.c._session = 'fd4f44fe-2cac-bba5-a60b-304b51ff39b8'
with patch.object(consul.Consul.KV, 'delete', Mock(return_value=True)):
with patch.object(consul.Consul.KV, 'put', Mock(return_value=True)):
self.assertTrue(self.c.update_leader(leader, 12345, failsafe={'foo': 'bar'}))
self.assertTrue(self.c.update_leader(cluster, 12345, failsafe={'foo': 'bar'}))
with patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException)):
self.assertFalse(self.c.update_leader(leader, 12345))
self.assertFalse(self.c.update_leader(cluster, 12345))
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 100, 200, 300])):
self.assertRaises(ConsulError, self.c.update_leader, leader, 12345)
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345)
with patch('time.time', Mock(side_effect=[0, 100, 200, 300])):
self.assertRaises(ConsulError, self.c.update_leader, leader, 12345)
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345)
with patch.object(consul.Consul.KV, 'delete', Mock(side_effect=ConsulException)):
self.assertFalse(self.c.update_leader(leader, 12347))
self.assertFalse(self.c.update_leader(cluster, 12347))
mock_renew.side_effect = RetryFailedError('')
self.c._last_session_refresh = 0
self.assertRaises(ConsulError, self.c.update_leader, leader, 12346)
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12346)
mock_renew.side_effect = ConsulException
self.assertFalse(self.c.update_leader(leader, 12347))
self.assertFalse(self.c.update_leader(cluster, 12347))

@patch.object(consul.Consul.KV, 'delete', Mock(return_value=True))
def test_delete_leader(self):
Expand Down
12 changes: 6 additions & 6 deletions tests/test_etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ def test_write_leader_optime(self):
self.etcd.write_leader_optime('0')

def test_update_leader(self):
leader = self.etcd.get_cluster().leader
self.assertTrue(self.etcd.update_leader(leader, None, failsafe={'foo': 'bar'}))
cluster = self.etcd.get_cluster()
self.assertTrue(self.etcd.update_leader(cluster, None, failsafe={'foo': 'bar'}))
with patch.object(etcd.Client, 'write',
Mock(side_effect=[etcd.EtcdConnectionFailed, etcd.EtcdClusterIdChanged, Exception])):
self.assertRaises(EtcdError, self.etcd.update_leader, leader, None)
self.assertFalse(self.etcd.update_leader(leader, None))
self.assertRaises(EtcdError, self.etcd.update_leader, leader, None)
self.assertRaises(EtcdError, self.etcd.update_leader, cluster, None)
self.assertFalse(self.etcd.update_leader(cluster, None))
self.assertRaises(EtcdError, self.etcd.update_leader, cluster, None)
with patch.object(etcd.Client, 'write', Mock(side_effect=etcd.EtcdKeyNotFound)):
self.assertFalse(self.etcd.update_leader(leader, None))
self.assertFalse(self.etcd.update_leader(cluster, None))

def test_initialize(self):
self.assertFalse(self.etcd.initialize())
Expand Down
14 changes: 7 additions & 7 deletions tests/test_etcd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,20 @@ def test_touch_member(self):
self.etcd3.touch_member({})

def test__update_leader(self):
leader = self.etcd3.get_cluster().leader
cluster = self.etcd3.get_cluster()
self.etcd3._lease = None
with patch.object(Etcd3Client, 'txn', Mock(return_value={'succeeded': True})):
self.etcd3.update_leader(leader, '123', failsafe={'foo': 'bar'})
self.etcd3.update_leader(cluster, '123', failsafe={'foo': 'bar'})
self.etcd3._last_lease_refresh = 0
self.etcd3.update_leader(leader, '124')
self.etcd3.update_leader(cluster, '124')
with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(return_value=True)), \
patch('time.time', Mock(side_effect=[0, 100, 200, 300])):
self.assertRaises(Etcd3Error, self.etcd3.update_leader, leader, '126')
self.etcd3._lease = leader.session
self.etcd3.update_leader(leader, '124')
self.assertRaises(Etcd3Error, self.etcd3.update_leader, cluster, '126')
self.etcd3._lease = cluster.leader.session
self.etcd3.update_leader(cluster, '124')
self.etcd3._last_lease_refresh = 0
with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(side_effect=Unknown)):
self.assertFalse(self.etcd3.update_leader(leader, '125'))
self.assertFalse(self.etcd3.update_leader(cluster, '125'))

def test_take_leader(self):
self.assertFalse(self.etcd3.take_leader())
Expand Down
27 changes: 13 additions & 14 deletions tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ def setUp(self, config=None):

@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
def test_update_leader(self, mock_patch_namespaced_endpoints):
leader = self.k.get_cluster().leader
self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'}))
self.assertIsNotNone(self.k.update_leader(self.k.get_cluster(), '123', failsafe={'foo': 'bar'}))
args = mock_patch_namespaced_endpoints.call_args[0]
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '1')
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1')
Expand All @@ -399,38 +398,38 @@ def setUp(self, config=None):

@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
def test_update_leader(self, mock_patch_namespaced_endpoints):
leader = self.k.get_cluster().leader
self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'}))
cluster = self.k.get_cluster()
self.assertIsNotNone(self.k.update_leader(cluster, '123', failsafe={'foo': 'bar'}))
args = mock_patch_namespaced_endpoints.call_args[0]
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10')
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.0')
self.k._kinds._object_cache['test'].subsets[:] = []
self.assertIsNotNone(self.k.update_leader(leader, '123'))
self.assertIsNotNone(self.k.update_leader(cluster, '123'))
self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1'
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))

@patch.object(k8s_client.CoreV1Api, 'read_namespaced_endpoints', create=True)
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
def test__update_leader_with_retry(self, mock_patch, mock_read):
leader = self.k.get_cluster().leader
cluster = self.k.get_cluster()
mock_read.return_value = mock_read_namespaced_endpoints()
mock_patch.side_effect = k8s_client.rest.ApiException(502, '')
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
mock_patch.side_effect = RetryFailedError('')
self.assertRaises(KubernetesError, self.k.update_leader, leader, '123')
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
with patch('time.time', Mock(side_effect=[0, 100, 200, 0, 0, 0, 0, 100, 200])):
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))
mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()]
mock_read.return_value.metadata.resource_version = '2'
self.assertIsNotNone(self.k._update_leader_with_retry({}, '1', []))
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
mock_read.side_effect = RetryFailedError('')
self.assertRaises(KubernetesError, self.k.update_leader, leader, '123')
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
mock_read.side_effect = Exception
self.assertFalse(self.k.update_leader(leader, '123'))
self.assertFalse(self.k.update_leader(cluster, '123'))

@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints',
Mock(side_effect=[k8s_client.rest.ApiException(500, ''),
Expand Down
5 changes: 2 additions & 3 deletions tests/test_raft.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,10 @@ def test_raft(self):
cluster = raft.get_cluster()
self.assertIsInstance(cluster, Cluster)
self.assertIsInstance(cluster.workers[1], Cluster)
leader = cluster.leader
self.assertTrue(raft.delete_leader(leader))
self.assertTrue(raft.delete_leader(cluster.leader))
self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}'))
raft.get_cluster()
self.assertTrue(raft.update_leader(leader, '1', failsafe={'foo': 'bat'}))
self.assertTrue(raft.update_leader(cluster, '1', failsafe={'foo': 'bat'}))
self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}'))
self.assertTrue(raft._sync_obj.set(raft.status_path, '{'))
raft.get_mpp_coordinator()
Expand Down
12 changes: 6 additions & 6 deletions tests/test_zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ def test_take_leader(self):
self.zk.take_leader()

def test_update_leader(self):
leader = self.zk.get_cluster().leader
self.assertFalse(self.zk.update_leader(leader, 12345))
cluster = self.zk.get_cluster()
self.assertFalse(self.zk.update_leader(cluster, 12345))
with patch.object(MockKazooClient, 'delete', Mock(side_effect=RetryFailedError)):
self.assertRaises(ZooKeeperError, self.zk.update_leader, leader, 12345)
self.assertRaises(ZooKeeperError, self.zk.update_leader, cluster, 12345)
with patch.object(MockKazooClient, 'delete', Mock(side_effect=NoNodeError)):
self.assertTrue(self.zk.update_leader(leader, 12345, failsafe={'foo': 'bar'}))
self.assertTrue(self.zk.update_leader(cluster, 12345, failsafe={'foo': 'bar'}))
with patch.object(MockKazooClient, 'create', Mock(side_effect=[RetryFailedError, Exception])):
self.assertRaises(ZooKeeperError, self.zk.update_leader, leader, 12345)
self.assertFalse(self.zk.update_leader(leader, 12345))
self.assertRaises(ZooKeeperError, self.zk.update_leader, cluster, 12345)
self.assertFalse(self.zk.update_leader(cluster, 12345))

@patch.object(Cluster, 'min_version', PropertyMock(return_value=(2, 0)))
def test_write_leader_optime(self):
Expand Down

0 comments on commit c15372b

Please sign in to comment.