diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index bdf97b712..7857bd83b 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -1371,8 +1371,9 @@ class AbstractDCS(abc.ABC): def __init__(self, config: Dict[str, Any], mpp: 'AbstractMPP') -> None: """Prepare DCS paths, MPP object, initial values for state information and processing dependencies. - :ivar config: :class:`dict`, reference to config section of selected DCS. - i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc... + :param config: :class:`dict`, reference to config section of selected DCS. + i.e.: ``zookeeper`` for zookeeper, ``etcd`` for etcd, etc... + :param mpp: an object implementing :class:`AbstractMPP` interface. """ self._mpp = mpp self._name = config['name'] @@ -1719,20 +1720,22 @@ def _update_leader(self, leader: Leader) -> bool: """ def update_leader(self, - leader: Leader, + cluster: Cluster, last_lsn: Optional[int], slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool: - """Update ``leader`` key (or session) ttl and optime/leader. + """Update ``leader`` key (or session) ttl, ``/status``, and ``/failsafe`` keys. - :param leader: :class:`Leader` object with information about the leader. + :param cluster: :class:`Cluster` object with information about the current cluster state. :param last_lsn: absolute WAL LSN in bytes. :param slots: dictionary with permanent slots ``confirmed_flush_lsn``. :param failsafe: if defined dictionary passed to :meth:`~AbstractDCS.write_failsafe`. :returns: ``True`` if ``leader`` key (or session) has been updated successfully. """ - ret = self._update_leader(leader) + if TYPE_CHECKING: # pragma: no cover + assert isinstance(cluster.leader, Leader) + ret = self._update_leader(cluster.leader) if ret and last_lsn: status: Dict[str, Any] = {self._OPTIME: last_lsn} if slots: diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 9590bb98f..8ed7041f1 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -1225,7 +1225,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any: return bool(_run_and_handle_exceptions(self._patch_or_create, self.leader_path, annotations, kind_resource_version, ips=ips, retry=_retry)) - def update_leader(self, leader: Leader, last_lsn: Optional[int], + def update_leader(self, cluster: Cluster, last_lsn: Optional[int], slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool: kind = self._kinds.get(self.leader_path) kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT diff --git a/patroni/ha.py b/patroni/ha.py index 8444fb115..8fc892614 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -377,10 +377,8 @@ def update_lock(self, update_status: bool = False) -> bool: {**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn}) except Exception: logger.exception('Exception when called state_handler.last_operation()') - if TYPE_CHECKING: # pragma: no cover - assert self.cluster.leader is not None try: - ret = self.dcs.update_leader(self.cluster.leader, last_lsn, slots, self._failsafe_config()) + ret = self.dcs.update_leader(self.cluster, last_lsn, slots, self._failsafe_config()) except DCSError: raise except Exception: diff --git a/tests/test_consul.py b/tests/test_consul.py index d18913e04..49ec3118a 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -180,24 +180,24 @@ def test_write_leader_optime(self): @patch.object(consul.Consul.Session, 'renew') @patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException)) def test_update_leader(self, mock_renew): - leader = self.c.get_cluster().leader + cluster = self.c.get_cluster() self.c._session = 'fd4f44fe-2cac-bba5-a60b-304b51ff39b8' with patch.object(consul.Consul.KV, 'delete', Mock(return_value=True)): with patch.object(consul.Consul.KV, 'put', Mock(return_value=True)): - self.assertTrue(self.c.update_leader(leader, 12345, failsafe={'foo': 'bar'})) + self.assertTrue(self.c.update_leader(cluster, 12345, failsafe={'foo': 'bar'})) with patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException)): - self.assertFalse(self.c.update_leader(leader, 12345)) + self.assertFalse(self.c.update_leader(cluster, 12345)) with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 100, 200, 300])): - self.assertRaises(ConsulError, self.c.update_leader, leader, 12345) + self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345) with patch('time.time', Mock(side_effect=[0, 100, 200, 300])): - self.assertRaises(ConsulError, self.c.update_leader, leader, 12345) + self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345) with patch.object(consul.Consul.KV, 'delete', Mock(side_effect=ConsulException)): - self.assertFalse(self.c.update_leader(leader, 12347)) + self.assertFalse(self.c.update_leader(cluster, 12347)) mock_renew.side_effect = RetryFailedError('') self.c._last_session_refresh = 0 - self.assertRaises(ConsulError, self.c.update_leader, leader, 12346) + self.assertRaises(ConsulError, self.c.update_leader, cluster, 12346) mock_renew.side_effect = ConsulException - self.assertFalse(self.c.update_leader(leader, 12347)) + self.assertFalse(self.c.update_leader(cluster, 12347)) @patch.object(consul.Consul.KV, 'delete', Mock(return_value=True)) def test_delete_leader(self): diff --git a/tests/test_etcd.py b/tests/test_etcd.py index 4c56c4258..656edff3c 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -301,15 +301,15 @@ def test_write_leader_optime(self): self.etcd.write_leader_optime('0') def test_update_leader(self): - leader = self.etcd.get_cluster().leader - self.assertTrue(self.etcd.update_leader(leader, None, failsafe={'foo': 'bar'})) + cluster = self.etcd.get_cluster() + self.assertTrue(self.etcd.update_leader(cluster, None, failsafe={'foo': 'bar'})) with patch.object(etcd.Client, 'write', Mock(side_effect=[etcd.EtcdConnectionFailed, etcd.EtcdClusterIdChanged, Exception])): - self.assertRaises(EtcdError, self.etcd.update_leader, leader, None) - self.assertFalse(self.etcd.update_leader(leader, None)) - self.assertRaises(EtcdError, self.etcd.update_leader, leader, None) + self.assertRaises(EtcdError, self.etcd.update_leader, cluster, None) + self.assertFalse(self.etcd.update_leader(cluster, None)) + self.assertRaises(EtcdError, self.etcd.update_leader, cluster, None) with patch.object(etcd.Client, 'write', Mock(side_effect=etcd.EtcdKeyNotFound)): - self.assertFalse(self.etcd.update_leader(leader, None)) + self.assertFalse(self.etcd.update_leader(cluster, None)) def test_initialize(self): self.assertFalse(self.etcd.initialize()) diff --git a/tests/test_etcd3.py b/tests/test_etcd3.py index bf52af073..2ea3699ef 100644 --- a/tests/test_etcd3.py +++ b/tests/test_etcd3.py @@ -253,20 +253,20 @@ def test_touch_member(self): self.etcd3.touch_member({}) def test__update_leader(self): - leader = self.etcd3.get_cluster().leader + cluster = self.etcd3.get_cluster() self.etcd3._lease = None with patch.object(Etcd3Client, 'txn', Mock(return_value={'succeeded': True})): - self.etcd3.update_leader(leader, '123', failsafe={'foo': 'bar'}) + self.etcd3.update_leader(cluster, '123', failsafe={'foo': 'bar'}) self.etcd3._last_lease_refresh = 0 - self.etcd3.update_leader(leader, '124') + self.etcd3.update_leader(cluster, '124') with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(return_value=True)), \ patch('time.time', Mock(side_effect=[0, 100, 200, 300])): - self.assertRaises(Etcd3Error, self.etcd3.update_leader, leader, '126') - self.etcd3._lease = leader.session - self.etcd3.update_leader(leader, '124') + self.assertRaises(Etcd3Error, self.etcd3.update_leader, cluster, '126') + self.etcd3._lease = cluster.leader.session + self.etcd3.update_leader(cluster, '124') self.etcd3._last_lease_refresh = 0 with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(side_effect=Unknown)): - self.assertFalse(self.etcd3.update_leader(leader, '125')) + self.assertFalse(self.etcd3.update_leader(cluster, '125')) def test_take_leader(self): self.assertFalse(self.etcd3.take_leader()) diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 4bb2da19d..1bfd5b120 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -383,8 +383,7 @@ def setUp(self, config=None): @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True) def test_update_leader(self, mock_patch_namespaced_endpoints): - leader = self.k.get_cluster().leader - self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'})) + self.assertIsNotNone(self.k.update_leader(self.k.get_cluster(), '123', failsafe={'foo': 'bar'})) args = mock_patch_namespaced_endpoints.call_args[0] self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '1') self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1') @@ -399,38 +398,38 @@ def setUp(self, config=None): @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True) def test_update_leader(self, mock_patch_namespaced_endpoints): - leader = self.k.get_cluster().leader - self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'})) + cluster = self.k.get_cluster() + self.assertIsNotNone(self.k.update_leader(cluster, '123', failsafe={'foo': 'bar'})) args = mock_patch_namespaced_endpoints.call_args[0] self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10') self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.0') self.k._kinds._object_cache['test'].subsets[:] = [] - self.assertIsNotNone(self.k.update_leader(leader, '123')) + self.assertIsNotNone(self.k.update_leader(cluster, '123')) self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1' - self.assertFalse(self.k.update_leader(leader, '123')) + self.assertFalse(self.k.update_leader(cluster, '123')) @patch.object(k8s_client.CoreV1Api, 'read_namespaced_endpoints', create=True) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True) def test__update_leader_with_retry(self, mock_patch, mock_read): - leader = self.k.get_cluster().leader + cluster = self.k.get_cluster() mock_read.return_value = mock_read_namespaced_endpoints() mock_patch.side_effect = k8s_client.rest.ApiException(502, '') - self.assertFalse(self.k.update_leader(leader, '123')) + self.assertFalse(self.k.update_leader(cluster, '123')) mock_patch.side_effect = RetryFailedError('') - self.assertRaises(KubernetesError, self.k.update_leader, leader, '123') + self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123') mock_patch.side_effect = k8s_client.rest.ApiException(409, '') with patch('time.time', Mock(side_effect=[0, 100, 200, 0, 0, 0, 0, 100, 200])): - self.assertFalse(self.k.update_leader(leader, '123')) - self.assertFalse(self.k.update_leader(leader, '123')) - self.assertFalse(self.k.update_leader(leader, '123')) + self.assertFalse(self.k.update_leader(cluster, '123')) + self.assertFalse(self.k.update_leader(cluster, '123')) + self.assertFalse(self.k.update_leader(cluster, '123')) mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()] mock_read.return_value.metadata.resource_version = '2' self.assertIsNotNone(self.k._update_leader_with_retry({}, '1', [])) mock_patch.side_effect = k8s_client.rest.ApiException(409, '') mock_read.side_effect = RetryFailedError('') - self.assertRaises(KubernetesError, self.k.update_leader, leader, '123') + self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123') mock_read.side_effect = Exception - self.assertFalse(self.k.update_leader(leader, '123')) + self.assertFalse(self.k.update_leader(cluster, '123')) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', Mock(side_effect=[k8s_client.rest.ApiException(500, ''), diff --git a/tests/test_raft.py b/tests/test_raft.py index b1a778c3c..09ea7e4bb 100644 --- a/tests/test_raft.py +++ b/tests/test_raft.py @@ -149,11 +149,10 @@ def test_raft(self): cluster = raft.get_cluster() self.assertIsInstance(cluster, Cluster) self.assertIsInstance(cluster.workers[1], Cluster) - leader = cluster.leader - self.assertTrue(raft.delete_leader(leader)) + self.assertTrue(raft.delete_leader(cluster.leader)) self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}')) raft.get_cluster() - self.assertTrue(raft.update_leader(leader, '1', failsafe={'foo': 'bat'})) + self.assertTrue(raft.update_leader(cluster, '1', failsafe={'foo': 'bat'})) self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}')) self.assertTrue(raft._sync_obj.set(raft.status_path, '{')) raft.get_mpp_coordinator() diff --git a/tests/test_zookeeper.py b/tests/test_zookeeper.py index eebc254bb..927966242 100644 --- a/tests/test_zookeeper.py +++ b/tests/test_zookeeper.py @@ -257,15 +257,15 @@ def test_take_leader(self): self.zk.take_leader() def test_update_leader(self): - leader = self.zk.get_cluster().leader - self.assertFalse(self.zk.update_leader(leader, 12345)) + cluster = self.zk.get_cluster() + self.assertFalse(self.zk.update_leader(cluster, 12345)) with patch.object(MockKazooClient, 'delete', Mock(side_effect=RetryFailedError)): - self.assertRaises(ZooKeeperError, self.zk.update_leader, leader, 12345) + self.assertRaises(ZooKeeperError, self.zk.update_leader, cluster, 12345) with patch.object(MockKazooClient, 'delete', Mock(side_effect=NoNodeError)): - self.assertTrue(self.zk.update_leader(leader, 12345, failsafe={'foo': 'bar'})) + self.assertTrue(self.zk.update_leader(cluster, 12345, failsafe={'foo': 'bar'})) with patch.object(MockKazooClient, 'create', Mock(side_effect=[RetryFailedError, Exception])): - self.assertRaises(ZooKeeperError, self.zk.update_leader, leader, 12345) - self.assertFalse(self.zk.update_leader(leader, 12345)) + self.assertRaises(ZooKeeperError, self.zk.update_leader, cluster, 12345) + self.assertFalse(self.zk.update_leader(cluster, 12345)) @patch.object(Cluster, 'min_version', PropertyMock(return_value=(2, 0))) def test_write_leader_optime(self):