Skip to content

Commit

Permalink
Merge pull request #413 from zalando/master
Browse files Browse the repository at this point in the history
Syncing from upstream zalando/patroni (master)
  • Loading branch information
bt-admin authored Jun 13, 2024
2 parents 1f5553c + 14a44e1 commit 6577853
Show file tree
Hide file tree
Showing 42 changed files with 214 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/install_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def install_requirements(what):
from setup import EXTRAS_REQUIRE, read
finally:
sys.path = old_path
requirements = ['mock>=2.0.0', 'flake8', 'pytest', 'pytest-cov'] if what == 'all' else ['behave']
requirements = ['flake8', 'pytest', 'pytest-cov'] if what == 'all' else ['behave']
requirements += ['coverage']
# try to split tests between psycopg2 and psycopg3
requirements += ['psycopg[binary]'] if sys.version_info >= (3, 8, 0) and\
Expand Down
2 changes: 1 addition & 1 deletion docs/yaml_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ Tags
----
- **clonefrom**: ``true`` or ``false``. If set to ``true`` other nodes might prefer to use this node for bootstrap (take ``pg_basebackup`` from). If there are several nodes with ``clonefrom`` tag set to ``true`` the node to bootstrap from will be chosen randomly. The default value is ``false``.
- **noloadbalance**: ``true`` or ``false``. If set to ``true`` the node will return HTTP Status Code 503 for the ``GET /replica`` REST API health-check and therefore will be excluded from the load-balancing. Defaults to ``false``.
- **replicatefrom**: The IP address/hostname of another replica. Used to support cascading replication.
- **replicatefrom**: The name of another replica to replicate from. Used to support cascading replication.
- **nosync**: ``true`` or ``false``. If set to ``true`` the node will never be selected as a synchronous replica.
- **nofailover**: ``true`` or ``false``, controls whether this node is allowed to participate in the leader race and become a leader. Defaults to ``false``, meaning this node _can_ participate in leader races.
- **failover_priority**: integer, controls the priority that this node should have during failover. Nodes with higher priority will be preferred over lower priority nodes if they received/replayed the same amount of WAL. However, nodes with higher values of receive/replay LSN are preferred regardless of their priority. If the ``failover_priority`` is 0 or negative - such node is not allowed to participate in the leader race and to become a leader (similar to ``nofailover: true``).
Expand Down
2 changes: 0 additions & 2 deletions features/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,8 +1076,6 @@ def before_all(context):
context.keyfile = os.path.join(context.pctl.output_dir, 'patroni.key')
context.certfile = os.path.join(context.pctl.output_dir, 'patroni.crt')
try:
if sys.platform == 'darwin' and 'GITHUB_ACTIONS' in os.environ:
raise Exception
with open(os.devnull, 'w') as null:
ret = subprocess.call(['openssl', 'req', '-nodes', '-new', '-x509', '-subj', '/CN=batman.patroni',
'-addext', 'subjectAltName=IP:127.0.0.1', '-keyout', context.keyfile,
Expand Down
27 changes: 20 additions & 7 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from random import randint
from threading import Event, Lock
from typing import Any, Callable, Collection, Dict, Iterator, List, \
NamedTuple, Optional, Tuple, Type, TYPE_CHECKING, Union
NamedTuple, Optional, Set, Tuple, Type, TYPE_CHECKING, Union
from urllib.parse import urlparse, urlunparse, parse_qsl

import dateutil.parser
Expand Down Expand Up @@ -913,7 +913,9 @@ def is_physical_slot(value: Union[Any, Dict[str, Any]]) -> bool:
:returns: ``True`` if *value* is a physical replication slot, otherwise ``False``.
"""
return not value or isinstance(value, dict) and value.get('type', 'physical') == 'physical'
return not value \
or (isinstance(value, dict) and not Cluster.is_logical_slot(value)
and value.get('type', 'physical') == 'physical')

@staticmethod
def is_logical_slot(value: Union[Any, Dict[str, Any]]) -> bool:
Expand Down Expand Up @@ -1179,6 +1181,10 @@ def should_enforce_hot_standby_feedback(self, postgresql: 'Postgresql', member:

if global_config.use_slots:
name = member.name if isinstance(member, Member) else postgresql.name

if not self.get_slot_name_on_primary(name, member):
return False

members = [m for m in self.members if m.replicatefrom == name and m.name != self.leader_name]
return any(self.should_enforce_hot_standby_feedback(postgresql, m) for m in members)
return False
Expand All @@ -1197,11 +1203,18 @@ def get_slot_name_on_primary(self, name: str, tags: Tags) -> Optional[str]:
:returns: the slot name on the primary that is in use for physical replication on this node.
"""
if tags.nostream:
return None
replicatefrom = self.get_member(tags.replicatefrom, False) if tags.replicatefrom else None
return self.get_slot_name_on_primary(replicatefrom.name, replicatefrom) \
if isinstance(replicatefrom, Member) else slot_name_from_member_name(name)
seen_nodes: Set[str] = set()
while True:
seen_nodes.add(name)
if tags.nostream:
return None
replicatefrom = self.get_member(tags.replicatefrom, False) \
if tags.replicatefrom and tags.replicatefrom != name else None
if not isinstance(replicatefrom, Member):
return slot_name_from_member_name(name)
if replicatefrom.name in seen_nodes:
return None
name, tags = replicatefrom.name, replicatefrom

@property
def timeline(self) -> int:
Expand Down
96 changes: 96 additions & 0 deletions patroni/postgresql/available_parameters/0_postgres.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
parameters:
allow_alter_system:
- type: Bool
version_from: 170000
allow_in_place_tablespaces:
- type: Bool
version_from: 150000
Expand Down Expand Up @@ -245,6 +248,12 @@ parameters:
version_from: 90300
min_val: 0
max_val: 1000
commit_timestamp_buffers:
- type: Integer
version_from: 170000
min_val: 0
max_val: 131072
unit: 8kB
compute_query_id:
- type: EnumBool
version_from: 140000
Expand Down Expand Up @@ -299,6 +308,7 @@ parameters:
db_user_namespace:
- type: Bool
version_from: 90300
version_till: 170000
deadlock_timeout:
- type: Integer
version_from: 90300
Expand All @@ -313,6 +323,12 @@ parameters:
debug_io_direct:
- type: String
version_from: 160000
debug_logical_replication_streaming:
- type: Enum
version_from: 170000
possible_values:
- buffered
- immediate
debug_parallel_query:
- type: EnumBool
version_from: 160000
Expand Down Expand Up @@ -406,6 +422,9 @@ parameters:
enable_gathermerge:
- type: Bool
version_from: 100000
enable_group_by_reordering:
- type: Bool
version_from: 170000
enable_hashagg:
- type: Bool
version_from: 90300
Expand Down Expand Up @@ -466,6 +485,9 @@ parameters:
event_source:
- type: String
version_from: 90300
event_triggers:
- type: Bool
version_from: 170000
exit_on_error:
- type: Bool
version_from: 90300
Expand Down Expand Up @@ -607,6 +629,12 @@ parameters:
ignore_system_indexes:
- type: Bool
version_from: 90300
io_combine_limit:
- type: Integer
version_from: 170000
min_val: 1
max_val: 32
unit: 8kB
IntervalStyle:
- type: Enum
version_from: 90300
Expand Down Expand Up @@ -875,6 +903,7 @@ parameters:
logical_replication_mode:
- type: Enum
version_from: 160000
version_till: 170000
possible_values:
- buffered
- immediate
Expand Down Expand Up @@ -919,6 +948,11 @@ parameters:
version_from: 100000
min_val: 0
max_val: 262143
max_notify_queue_pages:
- type: Integer
version_from: 170000
min_val: 64
max_val: 2147483647
max_parallel_apply_workers_per_subscription:
- type: Integer
version_from: 160000
Expand Down Expand Up @@ -1072,9 +1106,28 @@ parameters:
min_val: 2
max_val: 2147483647
unit: MB
multixact_member_buffers:
- type: Integer
version_from: 170000
min_val: 16
max_val: 131072
unit: 8kB
multixact_offset_buffers:
- type: Integer
version_from: 170000
min_val: 16
max_val: 131072
unit: 8kB
notify_buffers:
- type: Integer
version_from: 170000
min_val: 16
max_val: 131072
unit: 8kB
old_snapshot_threshold:
- type: Integer
version_from: 90600
version_till: 170000
min_val: -1
max_val: 86400
unit: min
Expand Down Expand Up @@ -1191,6 +1244,12 @@ parameters:
version_from: 90300
min_val: 0
max_val: 1.79769e+308
serializable_buffers:
- type: Integer
version_from: 170000
min_val: 16
max_val: 131072
unit: 8kB
session_preload_libraries:
- type: String
version_from: 90400
Expand Down Expand Up @@ -1283,6 +1342,9 @@ parameters:
standard_conforming_strings:
- type: Bool
version_from: 90300
standby_slot_names:
- type: String
version_from: 170000
statement_timeout:
- type: Integer
version_from: 90300
Expand All @@ -1300,6 +1362,15 @@ parameters:
- type: String
version_from: 90300
version_till: 150000
subtransaction_buffers:
- type: Integer
version_from: 170000
min_val: 0
max_val: 131072
unit: 8kB
summarize_wal:
- type: Bool
version_from: 170000
superuser_reserved_connections:
- type: Integer
version_from: 90300
Expand All @@ -1310,6 +1381,9 @@ parameters:
version_from: 90600
min_val: 0
max_val: 262143
sync_replication_slots:
- type: Bool
version_from: 170000
synchronize_seqscans:
- type: Bool
version_from: 90300
Expand Down Expand Up @@ -1394,12 +1468,16 @@ parameters:
timezone_abbreviations:
- type: String
version_from: 90300
trace_connection_negotiation:
- type: Bool
version_from: 170000
trace_notify:
- type: Bool
version_from: 90300
trace_recovery_messages:
- type: Enum
version_from: 90300
version_till: 170000
possible_values:
- debug5
- debug4
Expand Down Expand Up @@ -1452,6 +1530,12 @@ parameters:
track_wal_io_timing:
- type: Bool
version_from: 140000
transaction_buffers:
- type: Integer
version_from: 170000
min_val: 0
max_val: 131072
unit: 8kB
transaction_deferrable:
- type: Bool
version_from: 90300
Expand All @@ -1466,6 +1550,12 @@ parameters:
transaction_read_only:
- type: Bool
version_from: 90300
transaction_timeout:
- type: Integer
version_from: 170000
min_val: 0
max_val: 2147483647
unit: ms
transform_null_equals:
- type: Bool
version_from: 90300
Expand Down Expand Up @@ -1664,6 +1754,12 @@ parameters:
min_val: 0
max_val: 2147483647
unit: kB
wal_summary_keep_time:
- type: Integer
version_from: 170000
min_val: 0
max_val: 35791394
unit: min
wal_sync_method:
- type: Enum
version_from: 90300
Expand Down
26 changes: 17 additions & 9 deletions patroni/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,17 @@ def parse_dsn(value: str) -> Optional[Dict[str, str]]:
"""
Very simple equivalent of `psycopg2.extensions.parse_dsn` introduced in 2.7.0.
We are not using psycopg2 function in order to remain compatible with 2.5.4+.
There is one minor difference though, this function removes `dbname` from the result
and sets the `sslmode`, 'gssencmode', and `channel_binding` to `prefer` if it is not present in
the connection string. This is necessary to simplify comparison of the old and the new values.
There are a few minor differences though, this function sets the `sslmode`, 'gssencmode',
and `channel_binding` to `prefer` if they are not present in the connection string.
This is necessary to simplify comparison of the old and the new values.
>>> r = parse_dsn('postgresql://u%2Fse:pass@:%2f123,[::1]/db%2Fsdf?application_name=mya%2Fpp&ssl=true')
>>> r == {'application_name': 'mya/pp', 'host': ',::1', 'sslmode': 'require',\
>>> r == {'application_name': 'mya/pp', 'dbname': 'db/sdf', 'host': ',::1', 'sslmode': 'require',\
'password': 'pass', 'port': '/123,', 'user': 'u/se', 'gssencmode': 'prefer', 'channel_binding': 'prefer'}
True
>>> r = parse_dsn(" host = 'host' dbname = db\\\\ name requiressl=1 ")
>>> r == {'host': 'host', 'sslmode': 'require', 'gssencmode': 'prefer', 'channel_binding': 'prefer'}
>>> r == {'dbname': 'db name', 'host': 'host', 'sslmode': 'require',\
'gssencmode': 'prefer', 'channel_binding': 'prefer'}
True
>>> parse_dsn('requiressl = 0\\\\') == {'sslmode': 'prefer', 'gssencmode': 'prefer', 'channel_binding': 'prefer'}
True
Expand All @@ -147,8 +148,6 @@ def parse_dsn(value: str) -> Optional[Dict[str, str]]:
elif requiressl is not None:
ret['sslmode'] = 'prefer'
ret.setdefault('sslmode', 'prefer')
if 'dbname' in ret:
del ret['dbname']
ret.setdefault('gssencmode', 'prefer')
ret.setdefault('channel_binding', 'prefer')
return ret
Expand Down Expand Up @@ -570,8 +569,8 @@ def primary_conninfo_params(self, member: Union[Leader, Member, None]) -> Option
ret.setdefault('channel_binding', 'prefer')
if self._krbsrvname:
ret['krbsrvname'] = self._krbsrvname
if 'dbname' in ret:
del ret['dbname']
if not ret.get('dbname'):
ret['dbname'] = self._postgresql.database
return ret

def format_dsn(self, params: Dict[str, Any]) -> str:
Expand Down Expand Up @@ -771,12 +770,21 @@ def _check_primary_conninfo(self, primary_conninfo: Dict[str, Any],
elif not primary_conninfo:
return False

if self._postgresql.major_version < 170000:
# we want to compare dbname in primary_conninfo only for v17 onwards
wanted_primary_conninfo.pop('dbname', None)

if not self._postgresql.is_starting():
wal_receiver_primary_conninfo = self._postgresql.primary_conninfo()
if wal_receiver_primary_conninfo:
wal_receiver_primary_conninfo = parse_dsn(wal_receiver_primary_conninfo)
# when wal receiver is alive use primary_conninfo from pg_stat_wal_receiver for comparison
if wal_receiver_primary_conninfo:
# dbname in pg_stat_wal_receiver is always `replication`, we need to use a "real" one
wal_receiver_primary_conninfo.pop('dbname', None)
dbname = primary_conninfo.get('dbname')
if dbname:
wal_receiver_primary_conninfo['dbname'] = dbname
primary_conninfo = wal_receiver_primary_conninfo
# There could be no password in the primary_conninfo or it is masked.
# Just copy the "desired" value in order to make comparison succeed.
Expand Down
8 changes: 6 additions & 2 deletions patroni/postgresql/postmaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,15 @@ def pg_ctl_kill(self, mode: str, pg_ctl: str) -> Optional[bool]:
return not self.is_running()

def wait_for_user_backends_to_close(self, stop_timeout: Optional[float]) -> None:
# These regexps are cross checked against versions PostgreSQL 9.1 .. 16
# These regexps are cross checked against versions PostgreSQL 9.1 .. 17
aux_proc_re = re.compile("(?:postgres:)( .*:)? (?:(?:archiver|startup|autovacuum launcher|autovacuum worker|"
"checkpointer|logger|stats collector|wal receiver|wal writer|writer)(?: process )?|"
"walreceiver|wal sender process|walsender|walwriter|background writer|"
"logical replication launcher|logical replication worker for|bgworker:) ")
"logical replication launcher|logical replication worker for subscription|"
"logical replication tablesync worker for subscription|"
"logical replication parallel apply worker for subscription|"
"logical replication apply worker for subscription|"
"slotsync worker|walsummarizer|bgworker:) ")

try:
children = self.children()
Expand Down
Loading

0 comments on commit 6577853

Please sign in to comment.