Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream patroni/patroni (master) #424

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def do_GET(self, write_status_code_only: bool = False) -> None:
cluster = patroni.dcs.cluster
config = global_config.from_cluster(cluster)

leader_optime = cluster and cluster.last_lsn or 0
leader_optime = cluster and cluster.status.last_lsn
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
max_replica_lag = parse_int(self.path_query.get('lag', [sys.maxsize])[0], 'B')
if max_replica_lag is None:
Expand Down
42 changes: 25 additions & 17 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,13 @@ def empty() -> 'Status':
"""
return Status(0, None)

def is_empty(self):
"""Validate definition of all attributes of this :class:`Status` instance.

:returns: ``True`` if all attributes of the current :class:`Status` are unpopulated.
"""
return self.last_lsn == 0 and self.slots is None

@staticmethod
def from_node(value: Union[str, Dict[str, Any], None]) -> 'Status':
"""Factory method to parse *value* as :class:`Status` object.
Expand Down Expand Up @@ -814,14 +821,13 @@ def __new__(cls, *args: Any, **kwargs: Any):
return super(Cluster, cls).__new__(cls, *args, **kwargs)

@property
def last_lsn(self) -> int:
"""Last known leader LSN."""
return self.status.last_lsn
def slots(self) -> Dict[str, int]:
"""State of permanent replication slots on the primary in the format: ``{"slot_name": int}``.

@property
def slots(self) -> Optional[Dict[str, int]]:
"""State of permanent replication slots on the primary in the format: ``{"slot_name": int}``."""
return self.status.slots
.. note::
We are trying to be foolproof here and for values that can't be parsed to :class:`int` will return ``0``.
"""
return {k: parse_int(v) or 0 for k, v in (self.status.slots or {}).items()}

@staticmethod
def empty() -> 'Cluster':
Expand All @@ -833,9 +839,9 @@ def is_empty(self):

:returns: ``True`` if all attributes of the current :class:`Cluster` are unpopulated.
"""
return all((self.initialize is None, self.config is None, self.leader is None, self.last_lsn == 0,
return all((self.initialize is None, self.config is None, self.leader is None, self.status.is_empty(),
self.members == [], self.failover is None, self.sync.version is None,
self.history is None, self.slots is None, self.failsafe is None, self.workers == {}))
self.history is None, self.failsafe is None, self.workers == {}))

def __len__(self) -> int:
"""Implement ``len`` function capability.
Expand Down Expand Up @@ -934,8 +940,9 @@ def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
"""Dictionary of permanent replication slots with their known LSN."""
ret: Dict[str, Union[Dict[str, Any], Any]] = global_config.permanent_slots

members: Dict[str, int] = {slot_name_from_member_name(m.name): m.lsn or 0 for m in self.members}
slots: Dict[str, int] = {k: parse_int(v) or 0 for k, v in (self.slots or {}).items()}
members: Dict[str, int] = {slot_name_from_member_name(m.name): m.lsn or 0
for m in self.members if m.replicatefrom}
slots: Dict[str, int] = self.slots
for name, value in list(ret.items()):
if not value:
value = ret[name] = {}
Expand Down Expand Up @@ -1600,15 +1607,16 @@ def get_cluster(self) -> Cluster:
self.reset_cluster()
raise

self._last_seen = int(time.time())
self._last_status = {self._OPTIME: cluster.last_lsn}
if cluster.slots:
self._last_status['slots'] = cluster.slots
self._last_failsafe = cluster.failsafe

with self._cluster_thread_lock:
self._cluster = cluster
self._cluster_valid_till = time.time() + self.ttl

self._last_seen = int(time.time())
self._last_status = {self._OPTIME: cluster.status.last_lsn}
if cluster.status.slots:
self._last_status['slots'] = cluster.status.slots
self._last_failsafe = cluster.failsafe

return cluster

@property
Expand Down
2 changes: 1 addition & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ def is_lagging(self, wal_position: int) -> bool:

:returns True when node is lagging
"""
lag = (self.cluster.last_lsn or 0) - wal_position
lag = self.cluster.status.last_lsn - wal_position
return lag > global_config.maximum_lag_on_failover

def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion patroni/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:

config = global_config.from_cluster(cluster)
leader_name = cluster.leader.name if cluster.leader else None
cluster_lsn = cluster.last_lsn or 0
cluster_lsn = cluster.status.last_lsn

ret: Dict[str, Any] = {'members': []}
for m in cluster.members:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class TestRestApiHandler(unittest.TestCase):

def test_do_GET(self):
MockPostgresql.pending_restart_reason = {'max_connections': get_param_diff('200', '100')}
MockPatroni.dcs.cluster.last_lsn = 20
MockPatroni.dcs.cluster.status.last_lsn = 20
MockPatroni.dcs.cluster.sync.members = [MockPostgresql.name]
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /replica')
Expand Down
6 changes: 3 additions & 3 deletions tests/test_slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_should_enforce_hot_standby_feedback(self):
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
def test__ensure_logical_slots_replica(self):
self.p.set_role('replica')
self.cluster.slots['ls'] = 12346
self.cluster.status.slots['ls'] = 12346
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock(return_value=False)):
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 499, 'b', 'a', 5, 100, 500)])), \
Expand All @@ -222,10 +222,10 @@ def test__ensure_logical_slots_replica(self):
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
self.cluster.slots['ls'] = 'a'
self.cluster.status.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
self.cluster.config.data['slots']['ls']['database'] = 'b'
self.cluster.slots['ls'] = '500'
self.cluster.status.slots['ls'] = '500'
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])

Expand Down
2 changes: 1 addition & 1 deletion tests/test_zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test__cluster_loader(self):

def test_get_cluster(self):
cluster = self.zk.get_cluster()
self.assertEqual(cluster.last_lsn, 500)
self.assertEqual(cluster.status.last_lsn, 500)

def test__get_citus_cluster(self):
self.zk._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
Expand Down
Loading