From e9fd37ef66acfa16a6fffadf1e475342ef63a24a Mon Sep 17 00:00:00 2001 From: Denis Laxalde Date: Wed, 21 Feb 2024 09:43:57 +0100 Subject: [PATCH] Upgrade type annotations syntax We use the syntax from PEP 585 and PEP 604 and rely on 'from __future__ import annotations' to modernize the code base. --- pgactivity/activities.py | 18 +++-- pgactivity/cli.py | 5 +- pgactivity/compat.py | 6 +- pgactivity/config.py | 22 +++--- pgactivity/data.py | 37 +++++----- pgactivity/handlers.py | 10 +-- pgactivity/keys.py | 12 +-- pgactivity/pg.py | 86 ++++++++++------------ pgactivity/types.py | 155 +++++++++++++++++++-------------------- pgactivity/ui.py | 14 ++-- pgactivity/utils.py | 16 ++-- pgactivity/views.py | 28 +++---- pgactivity/widgets.py | 4 +- tests/conftest.py | 10 ++- tests/test_config.py | 6 +- tests/test_data.py | 5 +- 16 files changed, 220 insertions(+), 214 deletions(-) diff --git a/pgactivity/activities.py b/pgactivity/activities.py index a24e1d53..b0662266 100644 --- a/pgactivity/activities.py +++ b/pgactivity/activities.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import builtins import os import time -from typing import Dict, List, Optional, Sequence, Tuple, TypeVar +from typing import Sequence, TypeVar from warnings import catch_warnings, simplefilter import attr @@ -21,7 +23,7 @@ ) -def sys_get_proc(pid: int) -> Optional[SystemProcess]: +def sys_get_proc(pid: int) -> SystemProcess | None: """Return a SystemProcess instance matching given pid or None if access with psutil is not possible. """ @@ -53,9 +55,9 @@ def sys_get_proc(pid: int) -> Optional[SystemProcess]: def ps_complete( pg_processes: Sequence[RunningProcess], - processes: Dict[int, SystemProcess], + processes: dict[int, SystemProcess], fs_blocksize: int, -) -> Tuple[List[LocalRunningProcess], IOCounter, IOCounter]: +) -> tuple[list[LocalRunningProcess], IOCounter, IOCounter]: """Transform the sequence of 'pg_processes' (RunningProcess) as LocalRunningProcess with local system information from the 'processes' map. Return LocalRunningProcess list, as well as read and write IO counters. @@ -139,7 +141,7 @@ def ps_complete( T = TypeVar("T", RunningProcess, WaitingProcess, BlockingProcess, LocalRunningProcess) -def sorted(processes: List[T], *, key: SortKey, reverse: bool = False) -> List[T]: +def sorted(processes: list[T], *, key: SortKey, reverse: bool = False) -> list[T]: """Return processes sorted. >>> from ipaddress import IPv4Interface, ip_address @@ -314,12 +316,12 @@ def update_max_iops(max_iops: int, read_count: float, write_count: float) -> int return max(int(read_count + write_count), max_iops) -def get_load_average() -> Tuple[float, float, float]: +def get_load_average() -> tuple[float, float, float]: """Get load average""" return os.getloadavg() -def get_mem_swap() -> Tuple[MemoryInfo, SwapInfo]: +def get_mem_swap() -> tuple[MemoryInfo, SwapInfo]: """Get memory and swap usage""" with catch_warnings(): simplefilter("ignore", RuntimeWarning) @@ -335,7 +337,7 @@ def get_mem_swap() -> Tuple[MemoryInfo, SwapInfo]: ) -def mem_swap_load() -> Tuple[MemoryInfo, SwapInfo, LoadAverage]: +def mem_swap_load() -> tuple[MemoryInfo, SwapInfo, LoadAverage]: """Read memory, swap and load average from Data object.""" memory, swap = get_mem_swap() load = LoadAverage(*get_load_average()) diff --git a/pgactivity/cli.py b/pgactivity/cli.py index 48a09b9b..7aec79e5 100755 --- a/pgactivity/cli.py +++ b/pgactivity/cli.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import os import socket @@ -5,7 +7,6 @@ import time from argparse import ArgumentParser from io import StringIO -from typing import Optional from blessed import Terminal @@ -14,7 +15,7 @@ from .pg import OperationalError -def configure_logger(debug_file: Optional[str] = None) -> StringIO: +def configure_logger(debug_file: str | None = None) -> StringIO: logger = logging.getLogger("pgactivity") logger.setLevel(logging.DEBUG) diff --git a/pgactivity/compat.py b/pgactivity/compat.py index 4be71a25..d615664c 100644 --- a/pgactivity/compat.py +++ b/pgactivity/compat.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import operator from importlib.metadata import version -from typing import Any, Dict +from typing import Any import attr import attr.validators @@ -11,7 +13,7 @@ if ATTR_VERSION < (18, 1): - def fields_dict(cls: Any) -> Dict[str, Any]: + def fields_dict(cls: Any) -> dict[str, Any]: return {a.name: a for a in cls.__attrs_attrs__} else: diff --git a/pgactivity/config.py b/pgactivity/config.py index 280b93fa..7a4ddf94 100644 --- a/pgactivity/config.py +++ b/pgactivity/config.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import configparser import enum import os from pathlib import Path -from typing import IO, Any, Dict, List, Optional, Type, TypeVar +from typing import IO, Any, Type, TypeVar import attr from attr import validators @@ -70,7 +72,7 @@ class Flag(enum.Flag): PID = enum.auto() @classmethod - def names(cls) -> List[str]: + def names(cls) -> list[str]: rv = [] for f in cls: assert f.name @@ -78,14 +80,14 @@ def names(cls) -> List[str]: return rv @classmethod - def all(cls) -> "Flag": + def all(cls) -> Flag: value = cls(0) for f in cls: value |= f return value @classmethod - def from_config(cls, config: "Configuration") -> "Flag": + def from_config(cls, config: Configuration) -> Flag: value = cls(0) for f in cls: assert f.name is not None @@ -102,7 +104,7 @@ def from_config(cls, config: "Configuration") -> "Flag": @classmethod def load( cls, - config: Optional["Configuration"], + config: Configuration | None, *, is_local: bool, noappname: bool, @@ -117,7 +119,7 @@ def load( nowait: bool, nowrite: bool, **kwargs: Any, - ) -> "Flag": + ) -> Flag: """Build a Flag value from command line options.""" if config: flag = cls.from_config(config) @@ -163,13 +165,13 @@ def load( @attr.s(auto_attribs=True, frozen=True, slots=True) class UISection: hidden: bool = False - width: Optional[int] = attr.ib(default=None, validator=validators.optional(gt(0))) + width: int | None = attr.ib(default=None, validator=validators.optional(gt(0))) _T = TypeVar("_T", bound="UISection") @classmethod def from_config_section(cls: Type[_T], section: configparser.SectionProxy) -> _T: - values: Dict[str, Any] = {} + values: dict[str, Any] = {} known_options = {f.name: f for f in attr.fields(cls)} unknown_options = set(section) - set(known_options) if unknown_options: @@ -192,7 +194,7 @@ def from_config_section(cls: Type[_T], section: configparser.SectionProxy) -> _T ETC = Path("/etc") -class Configuration(Dict[str, UISection]): +class Configuration(dict[str, UISection]): _T = TypeVar("_T", bound="Configuration") @classmethod @@ -264,7 +266,7 @@ def lookup( *, user_config_home: Path = USER_CONFIG_HOME, etc: Path = ETC, - ) -> Optional[_T]: + ) -> _T | None: for base in (user_config_home, etc): fpath = base / "pg_activity.conf" if fpath.exists(): diff --git a/pgactivity/data.py b/pgactivity/data.py index 0af58676..58f6a741 100644 --- a/pgactivity/data.py +++ b/pgactivity/data.py @@ -1,9 +1,10 @@ +from __future__ import annotations + import getpass import logging import re from argparse import Namespace from functools import partial -from typing import Dict, List, Optional import attr import psutil @@ -70,7 +71,7 @@ class Data: server_encoding: bytes min_duration: float filters: Filters - dsn_parameters: Dict[str, str] + dsn_parameters: dict[str, str] failed_queries: FailedQueriesInfo @classmethod @@ -78,16 +79,16 @@ def pg_connect( cls, min_duration: float = 0.0, *, - host: Optional[str] = None, + host: str | None = None, port: int = 5432, user: str = "postgres", - password: Optional[str] = None, + password: str | None = None, database: str = "postgres", rds_mode: bool = False, dsn: str = "", hide_queries_in_logs: bool = False, filters: Filters = NO_FILTER, - ) -> "Data": + ) -> Data: """Create an instance by connecting to a PostgreSQL server.""" pg_conn = pg.connect( dsn, @@ -116,7 +117,7 @@ def pg_connect( dsn_parameters=pg.connection_parameters(pg_conn), ) - def try_reconnect(self) -> Optional["Data"]: + def try_reconnect(self) -> Data | None: try: pg_conn = pg.connect(**self.dsn_parameters) except (pg.InterfaceError, pg.OperationalError): @@ -189,7 +190,7 @@ def pg_terminate_backend(self, pid: int) -> bool: ret = pg.fetchone(self.pg_conn, query, {"pid": pid}) return ret["is_stopped"] # type: ignore[no-any-return] - def pg_get_temporary_file(self) -> Optional[TempFileInfo]: + def pg_get_temporary_file(self) -> TempFileInfo | None: """ Count the number of temporary files and get their total size """ @@ -233,7 +234,7 @@ def pg_get_temporary_file(self) -> Optional[TempFileInfo]: finally: pg.execute(self.pg_conn, queries.get("reset_statement_timeout")) - def pg_get_wal_senders(self) -> Optional[int]: + def pg_get_wal_senders(self) -> int | None: """ Count the number of wal senders """ @@ -244,7 +245,7 @@ def pg_get_wal_senders(self) -> Optional[int]: ret = pg.fetchone(self.pg_conn, query) return int(ret["wal_senders"]) - def pg_get_wal_receivers(self) -> Optional[int]: + def pg_get_wal_receivers(self) -> int | None: """ Count the number of wal receivers """ @@ -271,7 +272,7 @@ def pg_get_wal_receivers(self) -> Optional[int]: return int(ret["wal_receivers"]) - def pg_get_replication_slots(self) -> Optional[int]: + def pg_get_replication_slots(self) -> int | None: """ Count the number of replication slots """ @@ -290,7 +291,7 @@ def dbname_filter(self) -> sql.Composable: def pg_get_server_information( self, - prev_server_info: Optional[ServerInformation] = None, + prev_server_info: ServerInformation | None = None, using_rds: bool = False, skip_db_size: bool = False, skip_tempfile: bool = False, @@ -338,17 +339,17 @@ def pg_get_server_information( ) raise - temporary_file_info: Optional[TempFileInfo] = None + temporary_file_info: TempFileInfo | None = None if not skip_tempfile: temporary_file_info = self.pg_get_temporary_file() wal_senders = self.pg_get_wal_senders() - wal_receivers: Optional[int] = None + wal_receivers: int | None = None if not skip_walreceiver: wal_receivers = self.pg_get_wal_receivers() replication_slots = self.pg_get_replication_slots() - hr: Optional[Pct] = None - rr: Optional[Pct] = None + hr: Pct | None = None + rr: Pct | None = None tps, ips, ups, dps, rps = 0, 0, 0, 0, 0 size_ev = 0.0 if prev_server_info is not None: @@ -390,7 +391,7 @@ def pg_get_server_information( **ret, ) - def pg_get_activities(self, duration_mode: int = 1) -> List[RunningProcess]: + def pg_get_activities(self, duration_mode: int = 1) -> list[RunningProcess]: """ Get activity from pg_stat_activity view. """ @@ -425,7 +426,7 @@ def pg_get_activities(self, duration_mode: int = 1) -> List[RunningProcess]: text_as_bytes=True, ) - def pg_get_waiting(self, duration_mode: int = 1) -> List[WaitingProcess]: + def pg_get_waiting(self, duration_mode: int = 1) -> list[WaitingProcess]: """ Get waiting queries. """ @@ -452,7 +453,7 @@ def pg_get_waiting(self, duration_mode: int = 1) -> List[WaitingProcess]: text_as_bytes=True, ) - def pg_get_blocking(self, duration_mode: int = 1) -> List[BlockingProcess]: + def pg_get_blocking(self, duration_mode: int = 1) -> list[BlockingProcess]: """ Get blocking queries """ diff --git a/pgactivity/handlers.py b/pgactivity/handlers.py index 661bb5c7..79939053 100644 --- a/pgactivity/handlers.py +++ b/pgactivity/handlers.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations from blessed.keyboard import Keystroke @@ -8,7 +8,7 @@ def refresh_time( - key: Optional[str], value: float, minimum: float = 0.5, maximum: float = 5 + key: str | None, value: float, minimum: float = 0.5, maximum: float = 5 ) -> float: """Return an updated refresh time interval from input key respecting bounds. @@ -66,7 +66,7 @@ def wrap_query(key: Keystroke, wrap: bool) -> bool: return wrap -def query_mode(key: Keystroke) -> Optional[QueryMode]: +def query_mode(key: Keystroke) -> QueryMode | None: """Return the query mode matching input key or None. >>> import curses @@ -86,9 +86,7 @@ def query_mode(key: Keystroke) -> Optional[QueryMode]: return keys.QUERYMODE_FROM_KEYS.get(key) -def sort_key_for( - key: Keystroke, query_mode: QueryMode, flag: Flag -) -> Optional[SortKey]: +def sort_key_for(key: Keystroke, query_mode: QueryMode, flag: Flag) -> SortKey | None: """Return the sort key matching input key or None. >>> from blessed.keyboard import Keystroke as k diff --git a/pgactivity/keys.py b/pgactivity/keys.py index 59b28c0f..1758b7b5 100644 --- a/pgactivity/keys.py +++ b/pgactivity/keys.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import curses -from typing import Any, List, Optional, Tuple +from typing import Any import attr from blessed.keyboard import Keystroke @@ -11,7 +13,7 @@ class Key: value: str description: str - name: Optional[str] = None + name: str | None = None local_only: bool = False def __eq__(self, other: Any) -> bool: @@ -97,7 +99,7 @@ def is_toggle_header_worker_info(key: Keystroke) -> bool: EXIT_KEY = Key(EXIT, "quit") PAUSE_KEY = Key(SPACE, "pause/unpause", "Space") -BINDINGS: List[Key] = [ +BINDINGS: list[Key] = [ Key("Up/Down", "scroll process list"), PAUSE_KEY, Key(SORTBY_CPU, "sort by CPU% desc. (activities)", local_only=True), @@ -118,7 +120,7 @@ def is_toggle_header_worker_info(key: Keystroke) -> bool: ] -def _sequence_by_int(v: int) -> Tuple[str, str, int]: +def _sequence_by_int(v: int) -> tuple[str, str, int]: """ >>> _sequence_by_int(11) ('F11', '11', 275) @@ -137,6 +139,6 @@ def _sequence_by_int(v: int) -> Tuple[str, str, int]: } -MODES: List[Key] = [ +MODES: list[Key] = [ Key("/".join(KEYS_BY_QUERYMODE[qm][:-1]), qm.value) for qm in QueryMode ] diff --git a/pgactivity/pg.py b/pgactivity/pg.py index 8206bad4..10e45120 100644 --- a/pgactivity/pg.py +++ b/pgactivity/pg.py @@ -1,16 +1,8 @@ +from __future__ import annotations + import logging import os -from typing import ( - Any, - Callable, - Dict, - List, - Optional, - Sequence, - TypeVar, - Union, - overload, -) +from typing import Any, Callable, Sequence, TypeVar, overload Row = TypeVar("Row") @@ -35,7 +27,7 @@ __version__ = psycopg.__version__ - Connection = psycopg.Connection[Dict[str, Any]] + Connection = psycopg.Connection[dict[str, Any]] class BytesLoader(Loader): def load(self, data: Buffer) -> bytes: @@ -71,13 +63,13 @@ def connect(dsn: str = "", **kwargs: Any) -> Connection: def server_version(conn: Connection) -> int: return conn.info.server_version - def connection_parameters(conn: Connection) -> Dict[str, Any]: + def connection_parameters(conn: Connection) -> dict[str, Any]: return conn.info.get_parameters() def execute( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, ) -> None: conn.execute(query, args, prepare=True) @@ -92,8 +84,8 @@ def cursor( ) -> psycopg.Cursor[psycopg.rows.DictRow]: ... def cursor( - conn: Connection, mkrow: Optional[Callable[..., Row]], text_as_bytes: bool - ) -> Union[psycopg.Cursor[psycopg.rows.DictRow], psycopg.Cursor[Row]]: + conn: Connection, mkrow: Callable[..., Row] | None, text_as_bytes: bool + ) -> psycopg.Cursor[psycopg.rows.DictRow] | psycopg.Cursor[Row]: if mkrow is not None: cur = conn.cursor(row_factory=psycopg.rows.kwargs_row(mkrow)) else: @@ -105,8 +97,8 @@ def cursor( @overload def fetchone( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, mkrow: Callable[..., Row], text_as_bytes: bool = False, @@ -115,20 +107,20 @@ def fetchone( @overload def fetchone( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, text_as_bytes: bool = False, - ) -> Dict[str, Any]: ... + ) -> dict[str, Any]: ... def fetchone( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, - mkrow: Optional[Callable[..., Row]] = None, + mkrow: Callable[..., Row] | None = None, text_as_bytes: bool = False, - ) -> Union[Dict[str, Any], Row]: + ) -> dict[str, Any] | Row: with cursor(conn, mkrow, text_as_bytes) as cur: row = cur.execute(query, args, prepare=True).fetchone() assert row is not None @@ -137,30 +129,30 @@ def fetchone( @overload def fetchall( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, mkrow: Callable[..., Row], text_as_bytes: bool = False, - ) -> List[Row]: ... + ) -> list[Row]: ... @overload def fetchall( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, text_as_bytes: bool = False, - ) -> List[Dict[str, Any]]: ... + ) -> list[dict[str, Any]]: ... def fetchall( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, text_as_bytes: bool = False, - mkrow: Optional[Callable[..., Row]] = None, - ) -> Union[List[Dict[str, Any]], List[Row]]: + mkrow: Callable[..., Row] | None = None, + ) -> list[dict[str, Any]] | list[Row]: with cursor(conn, mkrow, text_as_bytes) as cur: return cur.execute(query, args, prepare=True).fetchall() @@ -212,25 +204,25 @@ def connect(dsn: str = "", **kwargs: Any) -> Connection: def server_version(conn: Connection) -> int: return conn.server_version # type: ignore[attr-defined, no-any-return] - def connection_parameters(conn: Connection) -> Dict[str, Any]: + def connection_parameters(conn: Connection) -> dict[str, Any]: return conn.info.dsn_parameters # type: ignore[attr-defined, no-any-return] def execute( conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, ) -> None: with conn.cursor() as cur: cur.execute(query, args) def fetchone( # type: ignore[no-redef] conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, - mkrow: Optional[Callable[..., Row]] = None, + mkrow: Callable[..., Row] | None = None, text_as_bytes: bool = False, - ) -> Union[Dict[str, Any], Row]: + ) -> dict[str, Any] | Row: with conn.cursor() as cur: if text_as_bytes: psycopg2.extensions.register_type(psycopg2.extensions.BYTES, cur) # type: ignore[arg-type] @@ -243,12 +235,12 @@ def fetchone( # type: ignore[no-redef] def fetchall( # type: ignore[no-redef] conn: Connection, - query: Union[str, sql.Composed], - args: Union[None, Sequence[Any], Dict[str, Any]] = None, + query: str | sql.Composed, + args: None | Sequence[Any] | dict[str, Any] = None, *, - mkrow: Optional[Callable[..., Row]] = None, + mkrow: Callable[..., Row] | None = None, text_as_bytes: bool = False, - ) -> Union[List[Dict[str, Any]], List[Row]]: + ) -> list[dict[str, Any]] | list[Row]: with conn.cursor() as cur: if text_as_bytes: psycopg2.extensions.register_type(psycopg2.extensions.BYTES, cur) # type: ignore[arg-type] diff --git a/pgactivity/types.py b/pgactivity/types.py index 4a442c1a..a4074c26 100644 --- a/pgactivity/types.py +++ b/pgactivity/types.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import enum import functools from datetime import timedelta @@ -5,18 +7,13 @@ from typing import ( Any, Callable, - Dict, Iterable, Iterator, - List, Mapping, MutableSet, - Optional, Sequence, - Tuple, Type, TypeVar, - Union, overload, ) @@ -57,10 +54,10 @@ def enum_next(e: E) -> E: @attr.s(auto_attribs=True, frozen=True, slots=True) class Filters: - dbname: Optional[str] = None + dbname: str | None = None @classmethod - def from_options(cls, filters: Sequence[str]) -> "Filters": + def from_options(cls, filters: Sequence[str]) -> Filters: fields = compat.fields_dict(cls) attrs = {} for f in filters: @@ -89,7 +86,7 @@ class SortKey(enum.Enum): duration = enum.auto() @classmethod - def default(cls) -> "SortKey": + def default(cls) -> SortKey: return cls.duration @@ -100,7 +97,7 @@ class QueryMode(enum.Enum): blocking = "blocking queries" @classmethod - def default(cls) -> "QueryMode": + def default(cls) -> QueryMode: return cls.activities @@ -155,14 +152,14 @@ class Column: key: str = attr.ib(repr=False) name: str mandatory: bool = False - sort_key: Optional[SortKey] = None + sort_key: SortKey | None = None min_width: int = attr.ib(default=0, repr=False) - max_width: Optional[int] = attr.ib(default=None, repr=False) + max_width: int | None = attr.ib(default=None, repr=False) justify: str = attr.ib( "left", validator=validators.in_(["left", "center", "right"]) ) transform: Callable[[Any], str] = attr.ib(default=if_none(""), repr=False) - color_key: Union[str, Callable[[Any], str]] = attr.ib( + color_key: str | Callable[[Any], str] = attr.ib( default=_color_key_marker, repr=False ) @@ -210,7 +207,7 @@ def color(self, value: Any) -> str: class UI: """State of the UI.""" - columns_by_querymode: Mapping[QueryMode, Tuple[Column, ...]] + columns_by_querymode: Mapping[QueryMode, tuple[Column, ...]] min_duration: float = 0.0 duration_mode: DurationMode = attr.ib( default=DurationMode.query, converter=DurationMode @@ -218,9 +215,9 @@ class UI: wrap_query: bool = False sort_key: SortKey = attr.ib(default=SortKey.default(), converter=SortKey) query_mode: QueryMode = attr.ib(default=QueryMode.activities, converter=QueryMode) - refresh_time: Union[float, int] = 2 + refresh_time: float | int = 2 in_pause: bool = False - interactive_timeout: Optional[int] = None + interactive_timeout: int | None = None show_instance_info_in_header: bool = True show_system_info_in_header: bool = True show_worker_info_in_header: bool = True @@ -228,14 +225,14 @@ class UI: @classmethod def make( cls, - config: Optional[Configuration] = None, + config: Configuration | None = None, flag: Flag = Flag.all(), *, max_db_length: int = 16, filters: Filters = NO_FILTER, **kwargs: Any, - ) -> "UI": - possible_columns: Dict[str, Column] = {} + ) -> UI: + possible_columns: dict[str, Column] = {} def add_column(key: str, name: str, **kwargs: Any) -> None: if config is not None: @@ -388,7 +385,7 @@ def add_column(key: str, name: str, **kwargs: Any) -> None: transform=utils.naturalsize, ) - columns_key_by_querymode: Mapping[QueryMode, List[str]] = { + columns_key_by_querymode: Mapping[QueryMode, list[str]] = { QueryMode.activities: [ "pid", "database", @@ -604,7 +601,7 @@ def column(self, key: str) -> Column: else: raise ValueError(key) - def columns(self) -> Tuple[Column, ...]: + def columns(self) -> tuple[Column, ...]: """Return the tuple of Column for current mode. >>> flag = Flag.PID | Flag.DATABASE | Flag.APPNAME | Flag.RELATION @@ -631,18 +628,18 @@ class SwapInfo: total: int @classmethod - def default(cls) -> "SwapInfo": + def default(cls) -> SwapInfo: return cls(0, 0, 0) @property - def pct_used(self) -> Optional[Pct]: + def pct_used(self) -> Pct | None: if self.total == 0: # account for the zero swap case (#318) return None return Pct(self.used * 100 / self.total) @property - def pct_free(self) -> Optional[Pct]: + def pct_free(self) -> Pct | None: if self.total == 0: # account for the zero swap case (#318) return None @@ -657,23 +654,23 @@ class MemoryInfo: total: int @classmethod - def default(cls) -> "MemoryInfo": + def default(cls) -> MemoryInfo: return cls(0, 0, 0, 0) @property - def pct_used(self) -> Optional[Pct]: + def pct_used(self) -> Pct | None: if self.total == 0: return None return Pct(self.used * 100 / self.total) @property - def pct_free(self) -> Optional[Pct]: + def pct_free(self) -> Pct | None: if self.total == 0: return None return Pct(self.free * 100 / self.total) @property - def pct_bc(self) -> Optional[Pct]: + def pct_bc(self) -> Pct | None: if self.total == 0: return None return Pct(self.buff_cached * 100 / self.total) @@ -686,7 +683,7 @@ class LoadAverage: avg15: float @classmethod - def default(cls) -> "LoadAverage": + def default(cls) -> LoadAverage: return cls(0.0, 0.0, 0.0) @@ -696,7 +693,7 @@ class IOCounter: bytes: int @classmethod - def default(cls) -> "IOCounter": + def default(cls) -> IOCounter: return cls(0, 0) @@ -713,10 +710,10 @@ class SystemInfo: def default( cls, *, - memory: Optional[MemoryInfo] = None, - swap: Optional[SwapInfo] = None, - load: Optional[LoadAverage] = None, - ) -> "SystemInfo": + memory: MemoryInfo | None = None, + swap: SwapInfo | None = None, + load: LoadAverage | None = None, + ) -> SystemInfo: """Zero-value builder. >>> SystemInfo.default() # doctest: +NORMALIZE_WHITESPACE @@ -771,19 +768,19 @@ class ServerInformation: total: int waiting: int max_connections: int - autovacuum_workers: Optional[int] + autovacuum_workers: int | None autovacuum_max_workers: int - logical_replication_workers: Optional[int] - parallel_workers: Optional[int] - max_logical_replication_workers: Optional[int] - max_parallel_workers: Optional[int] - max_worker_processes: Optional[int] - max_wal_senders: Optional[int] - max_replication_slots: Optional[int] - wal_senders: Optional[int] - wal_receivers: Optional[int] - replication_slots: Optional[int] - temporary_file: Optional[TempFileInfo] + logical_replication_workers: int | None + parallel_workers: int | None + max_logical_replication_workers: int | None + max_parallel_workers: int | None + max_worker_processes: int | None + max_wal_senders: int | None + max_replication_slots: int | None + wal_senders: int | None + wal_receivers: int | None + replication_slots: int | None + temporary_file: TempFileInfo | None # Computed in data.pg_get_server_information() size_evolution: float tps: int @@ -791,15 +788,15 @@ class ServerInformation: update_per_second: int delete_per_second: int tuples_returned_per_second: int - cache_hit_ratio_last_snap: Optional[Pct] = attr.ib( + cache_hit_ratio_last_snap: Pct | None = attr.ib( converter=attr.converters.optional(Pct) ) - rollback_ratio_last_snap: Optional[Pct] = attr.ib( + rollback_ratio_last_snap: Pct | None = attr.ib( converter=attr.converters.optional(Pct) ) @property - def worker_processes(self) -> Optional[int]: + def worker_processes(self) -> int | None: if self.parallel_workers is None and self.logical_replication_workers is None: return None else: @@ -844,14 +841,14 @@ def locktype(value: str) -> LockType: class BaseProcess: pid: int application_name: str - database: Optional[str] + database: str | None user: str - client: Union[None, IPv4Address, IPv6Address] - duration: Optional[float] + client: None | IPv4Address | IPv6Address + duration: float | None state: str - query: Optional[str] - encoding: Optional[str] - query_leader_pid: Optional[int] + query: str | None + encoding: str | None + query_leader_pid: int | None is_parallel_worker: bool _P = TypeVar("_P", bound="BaseProcess") @@ -861,7 +858,7 @@ def from_bytes( cls: Type[_P], server_encoding: bytes, *, - encoding: Optional[Union[str, bytes]], + encoding: str | bytes | None, **kwargs: Any, ) -> _P: if encoding is None: @@ -880,8 +877,8 @@ def from_bytes( class RunningProcess(BaseProcess): """Process for a running query.""" - wait: Union[bool, None, str] - query_leader_pid: Optional[int] + wait: bool | None | str + query_leader_pid: int | None is_parallel_worker: bool @@ -896,7 +893,7 @@ class WaitingProcess(BaseProcess): relation: str # TODO: update queries to select/compute these column. - query_leader_pid: Optional[int] = attr.ib(default=None, init=False) + query_leader_pid: int | None = attr.ib(default=None, init=False) is_parallel_worker: bool = attr.ib(default=False, init=False) @@ -909,26 +906,26 @@ class BlockingProcess(BaseProcess): mode: str type: LockType = attr.ib(converter=locktype) relation: str - wait: Union[bool, None, str] + wait: bool | None | str # TODO: update queries to select/compute these column. - query_leader_pid: Optional[int] = attr.ib(default=None, init=False) + query_leader_pid: int | None = attr.ib(default=None, init=False) is_parallel_worker: bool = attr.ib(default=False, init=False) @attr.s(auto_attribs=True, frozen=True, slots=True) class SystemProcess: - meminfo: Tuple[int, ...] + meminfo: tuple[int, ...] io_read: IOCounter io_write: IOCounter io_time: float mem_percent: float cpu_percent: float - cpu_times: Tuple[float, ...] + cpu_times: tuple[float, ...] read_delta: float write_delta: float io_wait: bool - psutil_proc: Optional[psutil.Process] + psutil_proc: psutil.Process | None @attr.s(auto_attribs=True, frozen=True, slots=True) @@ -941,8 +938,8 @@ class LocalRunningProcess(RunningProcess): @classmethod def from_process( - cls, process: RunningProcess, **kwargs: Union[float, str] - ) -> "LocalRunningProcess": + cls, process: RunningProcess, **kwargs: float | str + ) -> LocalRunningProcess: return cls(**dict(attr.asdict(process), **kwargs)) @@ -1047,8 +1044,8 @@ class SelectableProcesses: """ - items: List[BaseProcess] - focused: Optional[int] = None + items: list[BaseProcess] + focused: int | None = None pinned: MutableSet[int] = attr.ib(default=attr.Factory(set)) def __len__(self) -> int: @@ -1061,15 +1058,13 @@ def __iter__(self) -> Iterator[BaseProcess]: def __getitem__(self, i: int) -> BaseProcess: ... @overload - def __getitem__(self, s: slice) -> List[BaseProcess]: ... + def __getitem__(self, s: slice) -> list[BaseProcess]: ... - def __getitem__( - self, val: Union[int, slice] - ) -> Union[BaseProcess, List[BaseProcess]]: + def __getitem__(self, val: int | slice) -> BaseProcess | list[BaseProcess]: return self.items[val] @property - def selected(self) -> List[int]: + def selected(self) -> list[int]: if self.pinned: return list(self.pinned) elif self.focused: @@ -1084,7 +1079,7 @@ def reset(self) -> None: def set_items(self, new_items: Sequence[BaseProcess]) -> None: self.items[:] = list(new_items) - def position(self) -> Optional[int]: + def position(self) -> int | None: if self.focused is None: return None for idx, proc in enumerate(self.items): @@ -1135,10 +1130,10 @@ def toggle_pin_focused(self) -> None: self.pinned.add(self.focused) -ActivityStats = Union[ - Iterable[WaitingProcess], - Iterable[RunningProcess], - Tuple[Iterable[WaitingProcess], SystemInfo], - Tuple[Iterable[BlockingProcess], SystemInfo], - Tuple[Iterable[LocalRunningProcess], SystemInfo], -] +ActivityStats = ( + Iterable[WaitingProcess] + | Iterable[RunningProcess] + | tuple[Iterable[WaitingProcess], SystemInfo] + | tuple[Iterable[BlockingProcess], SystemInfo] + | tuple[Iterable[LocalRunningProcess], SystemInfo] +) diff --git a/pgactivity/ui.py b/pgactivity/ui.py index ee6f3144..51c5ce61 100644 --- a/pgactivity/ui.py +++ b/pgactivity/ui.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import time from argparse import Namespace from functools import partial -from typing import Dict, List, Optional, cast +from typing import cast import attr from blessed import Terminal @@ -13,15 +15,15 @@ def main( term: Terminal, - config: Optional[Configuration], + config: Configuration | None, data: Data, host: types.Host, options: Namespace, *, render_header: bool = True, render_footer: bool = True, - width: Optional[int] = None, - wait_on_actions: Optional[float] = None, + width: int | None = None, + wait_on_actions: float | None = None, ) -> None: fs_blocksize = options.blocksize @@ -52,7 +54,7 @@ def main( ) key, in_help = None, False - sys_procs: Dict[int, types.SystemProcess] = {} + sys_procs: dict[int, types.SystemProcess] = {} pg_procs = types.SelectableProcesses([]) activity_stats: types.ActivityStats @@ -208,7 +210,7 @@ def main( if is_local: # TODO: Use this logic in waiting and blocking cases. local_pg_procs, io_read, io_write = activities.ps_complete( - cast(List[types.RunningProcess], pg_procs.items), + cast(list[types.RunningProcess], pg_procs.items), sys_procs, fs_blocksize, ) diff --git a/pgactivity/utils.py b/pgactivity/utils.py index 888c4fb6..4495892b 100644 --- a/pgactivity/utils.py +++ b/pgactivity/utils.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import functools import re from datetime import datetime, timedelta -from typing import IO, Any, Iterable, List, Mapping, Optional, Tuple, Union +from typing import IO, Any, Iterable, Mapping import attr import humanize @@ -49,12 +51,12 @@ class MessagePile: """ n: int - messages: List[str] = attr.ib(default=attr.Factory(list), init=False) + messages: list[str] = attr.ib(default=attr.Factory(list), init=False) def send(self, message: str) -> None: self.messages[:] = [message] * self.n - def get(self) -> Optional[str]: + def get(self) -> str | None: if self.messages: return self.messages.pop() return None @@ -106,7 +108,7 @@ def ellipsis(v: str, width: int) -> str: return v[: wl + 1] + "..." + v[-wl:] -def get_duration(duration: Optional[float]) -> float: +def get_duration(duration: float | None) -> float: """Return 0 if the given duration is negative else, return the duration. >>> get_duration(None) @@ -122,7 +124,7 @@ def get_duration(duration: Optional[float]) -> float: @functools.lru_cache(maxsize=2) -def format_duration(duration: Optional[float]) -> Tuple[str, str]: +def format_duration(duration: float | None) -> tuple[str, str]: """Return a string from 'duration' value along with the color for rendering. >>> format_duration(None) @@ -165,7 +167,7 @@ def format_duration(duration: Optional[float]) -> Tuple[str, str]: return ctime, color -def wait_status(value: Union[None, bool, str]) -> str: +def wait_status(value: None | bool | str) -> str: """Display the waiting status of query. >>> wait_status(None) @@ -272,7 +274,7 @@ def clean_str_csv(s: str) -> str: + "\n" ) - def yn_na(value: Optional[bool]) -> str: + def yn_na(value: bool | None) -> str: if value is None: return "N/A" return yn(value) diff --git a/pgactivity/views.py b/pgactivity/views.py index 69c41ff1..a5110506 100644 --- a/pgactivity/views.py +++ b/pgactivity/views.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import functools import inspect import itertools from textwrap import TextWrapper, dedent -from typing import Any, Callable, Iterable, Iterator, List, Optional, Sequence, Tuple +from typing import Any, Callable, Iterable, Iterator, Sequence from blessed import Terminal @@ -47,7 +49,7 @@ def __next__(self) -> int: @functools.lru_cache(maxsize=512) -def shorten(term: Terminal, text: str, width: Optional[int] = None) -> str: +def shorten(term: Terminal, text: str, width: int | None = None) -> str: r"""Truncate 'text' to fit in the given 'width' (or term.width). This is similar to textwrap.shorten() but sequence-aware. @@ -157,7 +159,7 @@ def header( host: Host, pg_version: str, server_information: ServerInformation, - system_info: Optional[SystemInfo] = None, + system_info: SystemInfo | None = None, ) -> Iterator[str]: @functools.singledispatch def render(x: Any) -> str: @@ -188,7 +190,7 @@ def render_iocounter(i: IOCounter) -> str: return f"{term.bold_green(hbytes)} - {term.bold_green(counts)}" def render_columns( - columns: Sequence[List[str]], *, delimiter: str = f"{term.blue(',')} " + columns: Sequence[list[str]], *, delimiter: str = f"{term.blue(',')} " ) -> Iterator[str]: column_widths = [ max(len(column_row) for column_row in column) for column in columns @@ -402,7 +404,7 @@ def processes_rows( ui: UI, processes: SelectableProcesses, maxlines: int, - width: Optional[int], + width: int | None, ) -> Iterator[str]: """Display table rows with processes information.""" @@ -449,7 +451,7 @@ def cell( color_type = "yellow" else: color_type = "default" - text: List[str] = [] + text: list[str] = [] for column in ui.columns(): field = column.key if field != "query": @@ -472,13 +474,13 @@ def cell( yield from (" ".join(text) + term.normal).splitlines() -def footer_message(term: Terminal, message: str, width: Optional[int] = None) -> None: +def footer_message(term: Terminal, message: str, width: int | None = None) -> None: if width is None: width = term.width print(term.center(message[:width]) + term.normal, end="") -def footer_help(term: Terminal, width: Optional[int] = None) -> None: +def footer_help(term: Terminal, width: int | None = None) -> None: """Footer line with help keys.""" query_modes_help = [ ("/".join(keys[:-1]), qm.value) for qm, keys in KEYS_BY_QUERYMODE.items() @@ -493,7 +495,7 @@ def footer_help(term: Terminal, width: Optional[int] = None) -> None: def render_footer( - term: Terminal, footer_values: List[Tuple[str, str]], width: Optional[int] + term: Terminal, footer_values: list[tuple[str, str]], width: int | None ) -> None: if width is None: width = term.width @@ -514,7 +516,7 @@ def render_column(key: str, desc: str) -> str: print(term.ljust(row, width=width, fillchar=term.cyan_reverse(" ")), end="") -def footer_interative_help(term: Terminal, width: Optional[int] = None) -> None: +def footer_interative_help(term: Terminal, width: int | None = None) -> None: """Footer line with help keys for interactive mode.""" assert PROCESS_PIN.name is not None footer_values = [ @@ -535,14 +537,14 @@ def screen( pg_version: str, server_information: ServerInformation, activity_stats: ActivityStats, - message: Optional[str], + message: str | None, render_header: bool = True, render_footer: bool = True, - width: Optional[int] = None, + width: int | None = None, ) -> None: """Display the screen.""" - system_info: Optional[SystemInfo] + system_info: SystemInfo | None if isinstance(activity_stats, tuple): processes, system_info = activity_stats else: diff --git a/pgactivity/widgets.py b/pgactivity/widgets.py index e3bda772..bc8c3ee8 100644 --- a/pgactivity/widgets.py +++ b/pgactivity/widgets.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations from blessed import Terminal @@ -10,7 +10,7 @@ def boxed( border: bool = True, border_color: str = "white", center: bool = False, - width: Optional[int] = None, + width: int | None = None, ) -> str: if border: border_width = term.length(content) + 2 diff --git a/tests/conftest.py b/tests/conftest.py index 94da03cc..27548fc4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import logging import pathlib import threading -from typing import Any, List, Optional +from typing import Any import psycopg import psycopg.errors @@ -15,7 +17,7 @@ LOGGER.setLevel(logging.DEBUG) -def pytest_report_header(config: Any) -> List[str]: +def pytest_report_header(config: Any) -> list[str]: return [f"psycopg: {pg.__version__}"] @@ -28,7 +30,7 @@ def datadir() -> pathlib.Path: def database_factory(postgresql): dbnames = set() - def createdb(dbname: str, encoding: str, locale: Optional[str] = None) -> None: + def createdb(dbname: str, encoding: str, locale: str | None = None) -> None: with psycopg.connect(postgresql.info.dsn, autocommit=True) as conn: qs = sql.SQL( "CREATE DATABASE {dbname} ENCODING {encoding} TEMPLATE template0" @@ -67,7 +69,7 @@ def execute( query: str, commit: bool = False, autocommit: bool = False, - dbname: Optional[str] = None, + dbname: str | None = None, ) -> None: dsn, kwargs = postgresql.info.dsn, {} if dbname: diff --git a/tests/test_config.py b/tests/test_config.py index 93909b88..9d6b6e21 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from pathlib import Path -from typing import Any, Dict +from typing import Any import attr @@ -75,7 +77,7 @@ def test_flag_load(): def test_lookup(tmp_path: Path) -> None: - def asdict(cfg: Configuration) -> Dict[str, Any]: + def asdict(cfg: Configuration) -> dict[str, Any]: return {k: attr.asdict(v) for k, v in cfg.items()} cfg = Configuration.lookup(user_config_home=tmp_path) diff --git a/tests/test_data.py b/tests/test_data.py index 7fcd6673..4aca9783 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import time -from typing import Optional import attr import psycopg @@ -163,7 +164,7 @@ def test_client_encoding(postgresql, encoding: str) -> None: ], ) def test_postgres_and_python_encoding( - database_factory, pyenc: str, pgenc: str, locale: Optional[str], data, postgresql + database_factory, pyenc: str, pgenc: str, locale: str | None, data, postgresql ) -> None: dbname = pyenc try: