From 9beaa00b2c3ef7b0ec50f907ada607f4fd4e2127 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 27 Aug 2024 10:15:07 +0200 Subject: [PATCH 1/7] add httpx and poetry update --- poetry.lock | 99 +++++++++++++++++++++++++++++++++++++++++++++++--- pyproject.toml | 4 +- 2 files changed, 96 insertions(+), 7 deletions(-) diff --git a/poetry.lock b/poetry.lock index 5b93a8bb2..271eb42ee 100644 --- a/poetry.lock +++ b/poetry.lock @@ -11,6 +11,28 @@ files = [ {file = "alabaster-0.7.16.tar.gz", hash = "sha256:75a8b99c28a5dad50dd7f8ccdd447a121ddb3892da9e53d1ca5cca3106d58d65"}, ] +[[package]] +name = "anyio" +version = "4.7.0" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +optional = false +python-versions = ">=3.9" +files = [ + {file = "anyio-4.7.0-py3-none-any.whl", hash = "sha256:ea60c3723ab42ba6fff7e8ccb0488c898ec538ff4df1f1d5e642c3601d07e352"}, + {file = "anyio-4.7.0.tar.gz", hash = "sha256:2f834749c602966b7d456a7567cafcb309f96482b5081d14ac93ccd457f9dd48"}, +] + +[package.dependencies] +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} +idna = ">=2.8" +sniffio = ">=1.1" +typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""} + +[package.extras] +doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx_rtd_theme"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"] +trio = ["trio (>=0.26.1)"] + [[package]] name = "asttokens" version = "3.0.0" @@ -527,6 +549,62 @@ shapely = ">=2.0.0" all = ["GeoAlchemy2", "SQLAlchemy (>=1.3)", "folium", "geopy", "mapclassify", "matplotlib (>=3.5.0)", "psycopg-binary (>=3.1.0)", "pyarrow (>=8.0.0)", "xyzservices"] dev = ["black", "codecov", "pre-commit", "pytest (>=3.1.0)", "pytest-cov", "pytest-xdist"] +[[package]] +name = "h11" +version = "0.14.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.7" +files = [ + {file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"}, + {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, +] + +[[package]] +name = "httpcore" +version = "1.0.7" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd"}, + {file = "httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.13,<0.15" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<1.0)"] + +[[package]] +name = "httpx" +version = "0.28.1" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, + {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +httpcore = "==1.*" +idna = "*" + +[package.extras] +brotli = ["brotli", "brotlicffi"] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +zstd = ["zstandard (>=0.18.0)"] + [[package]] name = "icdiff" version = "2.0.7" @@ -1871,13 +1949,13 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] [[package]] name = "requests-oauthlib" -version = "1.3.1" +version = "2.0.0" description = "OAuthlib authentication support for Requests." optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.4" files = [ - {file = "requests-oauthlib-1.3.1.tar.gz", hash = "sha256:75beac4a47881eeb94d5ea5d6ad31ef88856affe2332b9aafb52c6452ccf0d7a"}, - {file = "requests_oauthlib-1.3.1-py2.py3-none-any.whl", hash = "sha256:2577c501a2fb8d05a304c09d090d6e47c306fef15809d102b327cf8364bddab5"}, + {file = "requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"}, + {file = "requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"}, ] [package.dependencies] @@ -2037,6 +2115,17 @@ files = [ {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +description = "Sniff out which async library your code is running under" +optional = false +python-versions = ">=3.7" +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + [[package]] name = "snowballstemmer" version = "2.2.0" @@ -2508,4 +2597,4 @@ yaml = ["PyYAML"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "f2cb7ece0e531691a61fa99ebc481daeeb5d94f2ff4fcfccad0eeafbacf027b9" +content-hash = "200369f0d87db4da9d7113d39914af70568437b5df401363f2107e6857a56fb5" diff --git a/pyproject.toml b/pyproject.toml index 60a65a009..14facdd5f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,9 +22,9 @@ packages = [{ include="cognite", from="." }] [tool.poetry.dependencies] python = "^3.10" -requests = "^2.27" -requests_oauthlib = "^1" +httpx = "^0" msal = "^1.31" +requests-oauthlib = "^2" protobuf = ">=4" packaging = ">=20" pip = ">=20.0.0" # make optional once poetry doesn't auto-remove it on "simple install" From 1e703a12a0b4d0b143cd2a296c1a86d3d6567c13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Fri, 20 Dec 2024 10:42:36 +0100 Subject: [PATCH 2/7] add: unpack_items function --- cognite/client/_api/datapoints.py | 3 ++- cognite/client/_api/files.py | 4 ++-- cognite/client/_api_client.py | 9 +++++---- cognite/client/utils/_auxiliary.py | 4 ++++ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cognite/client/_api/datapoints.py b/cognite/client/_api/datapoints.py index 9a26a5475..6507e07e9 100644 --- a/cognite/client/_api/datapoints.py +++ b/cognite/client/_api/datapoints.py @@ -53,6 +53,7 @@ is_positive, split_into_chunks, split_into_n_parts, + unpack_items, unpack_items_in_payload, ) from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks @@ -2143,5 +2144,5 @@ def fetch_datapoints(self) -> list[dict[str, Any]]: task_unwrap_fn=unpack_items_in_payload, task_list_element_unwrap_fn=IdentifierSequenceCore.extract_identifiers, ) - result = tasks_summary.joined_results(lambda res: res.json()["items"]) + result = tasks_summary.joined_results(unpack_items) return self._post_fix_status_codes_and_stringified_floats(result) diff --git a/cognite/client/_api/files.py b/cognite/client/_api/files.py index d87ced3d2..83aaf1d32 100644 --- a/cognite/client/_api/files.py +++ b/cognite/client/_api/files.py @@ -28,7 +28,7 @@ ) from cognite.client.data_classes.data_modeling import NodeId from cognite.client.exceptions import CogniteAPIError, CogniteAuthorizationError, CogniteFileUploadError -from cognite.client.utils._auxiliary import find_duplicates +from cognite.client.utils._auxiliary import find_duplicates, unpack_items from cognite.client.utils._concurrency import execute_tasks from cognite.client.utils._identifier import Identifier, IdentifierSequence from cognite.client.utils._validation import process_asset_subtree_ids, process_data_set_ids @@ -949,7 +949,7 @@ def retrieve_download_urls( ] tasks_summary = execute_tasks(self._post, tasks, max_workers=self._config.max_workers) tasks_summary.raise_compound_exception_if_failed_tasks() - results = tasks_summary.joined_results(unwrap_fn=lambda res: res.json()["items"]) + results = tasks_summary.joined_results(unpack_items) return { result.get("id") or result.get("externalId") or NodeId.load(result["instanceId"]): result["downloadUrl"] for result in results diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index 5e3846f0b..1aa06143b 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -49,6 +49,7 @@ interpolate_and_url_encode, is_unlimited, split_into_chunks, + unpack_items, unpack_items_in_payload, ) from cognite.client.utils._concurrency import TaskExecutor, execute_tasks @@ -407,7 +408,7 @@ def _retrieve_multiple( ) return (loaded[0] if loaded else None) if identifiers.is_singleton() else loaded - retrieved_items = tasks_summary.joined_results(lambda res: res.json()["items"]) + retrieved_items = tasks_summary.joined_results(unpack_items) if identifiers.is_singleton(): if retrieved_items: @@ -950,7 +951,7 @@ def str_format_element(el: T) -> str | dict | T: task_list_element_unwrap_fn=unwrap_element, str_format_element_fn=str_format_element, ) - created_resources = summary.joined_results(lambda res: res.json()["items"]) + created_resources = summary.joined_results(unpack_items) if single_item: return resource_cls._load(created_resources[0], cognite_client=self._cognite_client) @@ -987,7 +988,7 @@ def _delete_multiple( task_list_element_unwrap_fn=identifiers.unwrap_identifier, ) if returns_items: - return summary.joined_results(lambda res: res.json()["items"]) + return summary.joined_results(unpack_items) else: return None @@ -1073,7 +1074,7 @@ def _update_multiple( task_unwrap_fn=unpack_items_in_payload, task_list_element_unwrap_fn=lambda el: IdentifierSequenceCore.unwrap_identifier(el), ) - updated_items = tasks_summary.joined_results(lambda res: res.json()["items"]) + updated_items = tasks_summary.joined_results(unpack_items) if single_item: return resource_cls._load(updated_items[0], cognite_client=self._cognite_client) diff --git a/cognite/client/utils/_auxiliary.py b/cognite/client/utils/_auxiliary.py index a0940af60..81e7101f6 100644 --- a/cognite/client/utils/_auxiliary.py +++ b/cognite/client/utils/_auxiliary.py @@ -263,3 +263,7 @@ def flatten_dict(d: dict[str, Any], parent_keys: tuple[str, ...], sep: str = "." else: items.append((sep.join((*parent_keys, key)), value)) return dict(items) + + +def unpack_items(res: Response) -> list[Any]: + return res.json()["items"] From 4f8ea00201beaeb63e80e9a9993d8952222d9888 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Fri, 20 Dec 2024 10:59:20 +0100 Subject: [PATCH 3/7] refactor: use compiled regexes --- cognite/client/_api_client.py | 72 +++++++++++++++++------------------ 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index 1aa06143b..a485d23f3 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -75,42 +75,42 @@ VALID_AGGREGATIONS = {"count", "cardinalityValues", "cardinalityProperties", "uniqueValues", "uniqueProperties"} +RETRYABLE_POST_ENDPOINT_REGEX = "|".join( + rf"^/{path}(\?.*)?$" + for path in ( + "(assets|events|files|timeseries|sequences|datasets|relationships|labels)/(list|byids|search|aggregate)", + "files/downloadlink", + "timeseries/(data(/(list|latest|delete))?|synthetic/query)", + "sequences/data(/(list|delete))?", + "raw/dbs/[^/]+/tables/[^/]+/rows(/delete)?", + "context/entitymatching/(byids|list|jobs)", + "sessions/revoke", + "models/.*", + ".*/graphql", + "units/.*", + "annotations/(list|byids|reverselookup)", + r"functions/(list|byids|status|schedules/(list|byids)|\d+/calls/(list|byids))", + r"3d/models/\d+/revisions/\d+/(mappings/list|nodes/(list|byids))", + "documents/(aggregate|list|search)", + "profiles/(byids|search)", + "geospatial/(compute|crs/byids|featuretypes/(byids|list))", + "geospatial/featuretypes/[A-Za-z][A-Za-z0-9_]{0,31}/features/(aggregate|list|byids|search|search-streaming|[A-Za-z][A-Za-z0-9_]{0,255}/rasters/[A-Za-z][A-Za-z0-9_]{0,31})", + "transformations/(filter|byids|jobs/byids|schedules/byids|query/run)", + "simulators/list", + "extpipes/(list|byids|runs/list)", + "workflows/.*", + "hostedextractors/.*", + "postgresgateway/.*", + "context/diagram/.*", + ) +) + class APIClient: _RESOURCE_PATH: str - # TODO: When Cognite Experimental SDK is deprecated, remove frozenset in favour of re.compile: - _RETRYABLE_POST_ENDPOINT_REGEX_PATTERNS: ClassVar[frozenset[str]] = frozenset( - [ - r"|".join( - rf"^/{path}(\?.*)?$" - for path in ( - "(assets|events|files|timeseries|sequences|datasets|relationships|labels)/(list|byids|search|aggregate)", - "files/downloadlink", - "timeseries/(data(/(list|latest|delete))?|synthetic/query)", - "sequences/data(/(list|delete))?", - "raw/dbs/[^/]+/tables/[^/]+/rows(/delete)?", - "context/entitymatching/(byids|list|jobs)", - "sessions/revoke", - "models/.*", - ".*/graphql", - "units/.*", - "annotations/(list|byids|reverselookup)", - r"functions/(list|byids|status|schedules/(list|byids)|\d+/calls/(list|byids))", - r"3d/models/\d+/revisions/\d+/(mappings/list|nodes/(list|byids))", - "documents/(aggregate|list|search)", - "profiles/(byids|search)", - "geospatial/(compute|crs/byids|featuretypes/(byids|list))", - "geospatial/featuretypes/[A-Za-z][A-Za-z0-9_]{0,31}/features/(aggregate|list|byids|search|search-streaming|[A-Za-z][A-Za-z0-9_]{0,255}/rasters/[A-Za-z][A-Za-z0-9_]{0,31})", - "transformations/(filter|byids|jobs/byids|schedules/byids|query/run)", - "simulators/list", - "extpipes/(list|byids|runs/list)", - "workflows/.*", - "hostedextractors/.*", - "postgresgateway/.*", - "context/diagram/.*", - ) - ) - ] + _RETRYABLE_POST_ENDPOINT_REGEX_PATTERN: ClassVar[re.Pattern[str]] = re.compile(RETRYABLE_POST_ENDPOINT_REGEX) + _VALID_URL_PATTERN = re.compile( # TODO: Remove playground? + r"^https?://[a-z\d.:\-]+(?:/api/(?:v1|playground)/projects/[^/]+)?((/[^\?]+)?(\?.+)?)" ) def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: @@ -294,12 +294,10 @@ def _is_retryable(self, method: str, path: str) -> bool: @classmethod @functools.lru_cache(64) def _url_is_retryable(cls, url: str) -> bool: - valid_url_pattern = r"^https?://[a-z\d.:\-]+(?:/api/(?:v1|playground)/projects/[^/]+)?((/[^\?]+)?(\?.+)?)" - match = re.match(valid_url_pattern, url) - if not match: + if not (match := cls._VALID_URL_PATTERN.match(url)): raise ValueError(f"URL {url} is not valid. Cannot resolve whether or not it is retryable") path = match.group(1) - return any(re.match(pattern, path) for pattern in cls._RETRYABLE_POST_ENDPOINT_REGEX_PATTERNS) + return bool(cls._RETRYABLE_POST_ENDPOINT_REGEX_PATTERN.match(path)) def _retrieve( self, From ce54f0c11e261c4dd8ad2ee3c4496a686b14e788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Fri, 20 Dec 2024 14:33:59 +0100 Subject: [PATCH 4/7] refactor: move url methods to utils/_url.py --- cognite/client/_api_client.py | 68 +--------------------------------- cognite/client/utils/_url.py | 70 +++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 67 deletions(-) create mode 100644 cognite/client/utils/_url.py diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index a485d23f3..bc5c82cd4 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -4,21 +4,18 @@ import gzip import itertools import logging -import re import warnings from collections import UserList from collections.abc import Iterator, Mapping, MutableMapping, Sequence from typing import ( TYPE_CHECKING, Any, - ClassVar, Literal, NoReturn, TypeVar, cast, overload, ) -from urllib.parse import urljoin import requests.utils from requests import Response @@ -62,6 +59,7 @@ ) from cognite.client.utils._json import JSONDecodeError from cognite.client.utils._text import convert_all_keys_to_camel_case, shorten, to_camel_case, to_snake_case +from cognite.client.utils._url import resolve_url from cognite.client.utils._validation import assert_type, verify_limit from cognite.client.utils.useful_types import SequenceNotStr @@ -75,43 +73,9 @@ VALID_AGGREGATIONS = {"count", "cardinalityValues", "cardinalityProperties", "uniqueValues", "uniqueProperties"} -RETRYABLE_POST_ENDPOINT_REGEX = "|".join( - rf"^/{path}(\?.*)?$" - for path in ( - "(assets|events|files|timeseries|sequences|datasets|relationships|labels)/(list|byids|search|aggregate)", - "files/downloadlink", - "timeseries/(data(/(list|latest|delete))?|synthetic/query)", - "sequences/data(/(list|delete))?", - "raw/dbs/[^/]+/tables/[^/]+/rows(/delete)?", - "context/entitymatching/(byids|list|jobs)", - "sessions/revoke", - "models/.*", - ".*/graphql", - "units/.*", - "annotations/(list|byids|reverselookup)", - r"functions/(list|byids|status|schedules/(list|byids)|\d+/calls/(list|byids))", - r"3d/models/\d+/revisions/\d+/(mappings/list|nodes/(list|byids))", - "documents/(aggregate|list|search)", - "profiles/(byids|search)", - "geospatial/(compute|crs/byids|featuretypes/(byids|list))", - "geospatial/featuretypes/[A-Za-z][A-Za-z0-9_]{0,31}/features/(aggregate|list|byids|search|search-streaming|[A-Za-z][A-Za-z0-9_]{0,255}/rasters/[A-Za-z][A-Za-z0-9_]{0,31})", - "transformations/(filter|byids|jobs/byids|schedules/byids|query/run)", - "simulators/list", - "extpipes/(list|byids|runs/list)", - "workflows/.*", - "hostedextractors/.*", - "postgresgateway/.*", - "context/diagram/.*", - ) -) - class APIClient: _RESOURCE_PATH: str - _RETRYABLE_POST_ENDPOINT_REGEX_PATTERN: ClassVar[re.Pattern[str]] = re.compile(RETRYABLE_POST_ENDPOINT_REGEX) - _VALID_URL_PATTERN = re.compile( # TODO: Remove playground? - r"^https?://[a-z\d.:\-]+(?:/api/(?:v1|playground)/projects/[^/]+)?((/[^\?]+)?(\?.+)?)" - ) def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: self._config = config @@ -269,36 +233,6 @@ def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None: auth_header_name, auth_header_value = self._config.credentials.authorization_header() headers[auth_header_name] = auth_header_value - def _resolve_url(self, method: str, url_path: str) -> tuple[bool, str]: - if not url_path.startswith("/"): - raise ValueError("URL path must start with '/'") - base_url = self._get_base_url_with_base_path() - full_url = base_url + url_path - is_retryable = self._is_retryable(method, full_url) - return is_retryable, full_url - - def _get_base_url_with_base_path(self) -> str: - base_path = "" - if self._api_version: - base_path = f"/api/{self._api_version}/projects/{self._config.project}" - return urljoin(self._config.base_url, base_path) - - def _is_retryable(self, method: str, path: str) -> bool: - valid_methods = ["GET", "POST", "PUT", "DELETE", "PATCH"] - - if method not in valid_methods: - raise ValueError(f"Method {method} is not valid. Must be one of {valid_methods}") - - return method in ["GET", "PUT", "PATCH"] or (method == "POST" and self._url_is_retryable(path)) - - @classmethod - @functools.lru_cache(64) - def _url_is_retryable(cls, url: str) -> bool: - if not (match := cls._VALID_URL_PATTERN.match(url)): - raise ValueError(f"URL {url} is not valid. Cannot resolve whether or not it is retryable") - path = match.group(1) - return bool(cls._RETRYABLE_POST_ENDPOINT_REGEX_PATTERN.match(path)) - def _retrieve( self, identifier: IdentifierCore, diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py new file mode 100644 index 000000000..75b1c988a --- /dev/null +++ b/cognite/client/utils/_url.py @@ -0,0 +1,70 @@ +import re +from urllib.parse import urljoin + +from cognite.client.config import ClientConfig + +RETRYABLE_POST_ENDPOINT_REGEX_PATTERN: re.Pattern[str] = re.compile( + "|".join( + rf"^/{path}(\?.*)?$" + for path in ( + "(assets|events|files|timeseries|sequences|datasets|relationships|labels)/(list|byids|search|aggregate)", + "files/downloadlink", + "timeseries/(data(/(list|latest|delete))?|synthetic/query)", + "sequences/data(/(list|delete))?", + "raw/dbs/[^/]+/tables/[^/]+/rows(/delete)?", + "context/entitymatching/(byids|list|jobs)", + "sessions/revoke", + "models/.*", + ".*/graphql", + "units/.*", + "annotations/(list|byids|reverselookup)", + r"functions/(list|byids|status|schedules/(list|byids)|\d+/calls/(list|byids))", + r"3d/models/\d+/revisions/\d+/(mappings/list|nodes/(list|byids))", + "documents/(aggregate|list|search)", + "profiles/(byids|search)", + "geospatial/(compute|crs/byids|featuretypes/(byids|list))", + "geospatial/featuretypes/[A-Za-z][A-Za-z0-9_]{0,31}/features/(aggregate|list|byids|search|search-streaming|[A-Za-z][A-Za-z0-9_]{0,255}/rasters/[A-Za-z][A-Za-z0-9_]{0,31})", + "transformations/(filter|byids|jobs/byids|schedules/byids|query/run)", + "simulators/list", + "extpipes/(list|byids|runs/list)", + "workflows/.*", + "hostedextractors/.*", + "postgresgateway/.*", + "context/diagram/.*", + ) + ) +) +VALID_URL_PATTERN = re.compile( # TODO: Remove playground? + r"^https?://[a-z\d.:\-]+(?:/api/(?:v1|playground)/projects/[^/]+)?((/[^\?]+)?(\?.+)?)" +) +VALID_METHODS = {"GET", "POST", "PUT", "DELETE", "PATCH"} + + +def resolve_url(method: str, url_path: str, api_version: str | None, config: ClientConfig) -> tuple[bool, str]: + if not url_path.startswith("/"): + raise ValueError("URL path must start with '/'") + elif method not in VALID_METHODS: + raise ValueError(f"Method {method} is not valid. Must be one of {VALID_METHODS}") + + full_url = get_base_url_with_base_path(api_version, config) + url_path + if valid_url := VALID_URL_PATTERN.match(full_url): + is_retryable = can_be_retried(method, valid_url.group(1)) + return is_retryable, full_url + + raise ValueError(f"URL {full_url} is not valid. Cannot resolve whether or not it is retryable") + + +def get_base_url_with_base_path(api_version: str | None, config: ClientConfig) -> str: + if api_version: + return urljoin(config.base_url, f"/api/{api_version}/projects/{config.project}") + return config.base_url + + +def can_be_retried(method: str, path: str) -> bool: + match method: + case "GET" | "PUT" | "PATCH": + return True + case "POST" if RETRYABLE_POST_ENDPOINT_REGEX_PATTERN.match(path): + return True + case _: + return False From c5040e859bbc7861213921e2ff858ada80b96758 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Fri, 20 Dec 2024 15:02:30 +0100 Subject: [PATCH 5/7] refactor: split out basic api methods in BasicAPIClient --- cognite/client/_api_client.py | 288 +-------------------------- cognite/client/_basic_api_client.py | 289 ++++++++++++++++++++++++++++ 2 files changed, 294 insertions(+), 283 deletions(-) create mode 100644 cognite/client/_basic_api_client.py diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index bc5c82cd4..cdd03b0ab 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -1,29 +1,21 @@ from __future__ import annotations import functools -import gzip import itertools import logging import warnings from collections import UserList -from collections.abc import Iterator, Mapping, MutableMapping, Sequence +from collections.abc import Iterator, Mapping, Sequence from typing import ( TYPE_CHECKING, Any, Literal, - NoReturn, TypeVar, cast, overload, ) -import requests.utils -from requests import Response -from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError -from requests.structures import CaseInsensitiveDict - -from cognite.client._http_client import HTTPClient, HTTPClientConfig, get_global_requests_session -from cognite.client.config import global_config +from cognite.client._basic_api_client import BasicAPIClient from cognite.client.data_classes._base import ( CogniteFilter, CogniteObject, @@ -38,11 +30,8 @@ ) from cognite.client.data_classes.aggregations import AggregationFilter, UniqueResultList from cognite.client.data_classes.filters import Filter -from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError, CogniteProjectAccessError -from cognite.client.utils import _json +from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError from cognite.client.utils._auxiliary import ( - get_current_sdk_version, - get_user_agent, interpolate_and_url_encode, is_unlimited, split_into_chunks, @@ -57,16 +46,10 @@ IdentifierSequenceCore, SingletonIdentifierSequence, ) -from cognite.client.utils._json import JSONDecodeError -from cognite.client.utils._text import convert_all_keys_to_camel_case, shorten, to_camel_case, to_snake_case -from cognite.client.utils._url import resolve_url +from cognite.client.utils._text import convert_all_keys_to_camel_case, to_camel_case, to_snake_case from cognite.client.utils._validation import assert_type, verify_limit from cognite.client.utils.useful_types import SequenceNotStr -if TYPE_CHECKING: - from cognite.client import CogniteClient - from cognite.client.config import ClientConfig - logger = logging.getLogger(__name__) T = TypeVar("T", bound=CogniteObject) @@ -74,165 +57,9 @@ VALID_AGGREGATIONS = {"count", "cardinalityValues", "cardinalityProperties", "uniqueValues", "uniqueProperties"} -class APIClient: +class APIClient(BasicAPIClient): _RESOURCE_PATH: str - def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: - self._config = config - self._api_version = api_version - self._api_subversion = config.api_subversion - self._cognite_client = cognite_client - self._init_http_clients() - - self._CREATE_LIMIT = 1000 - self._LIST_LIMIT = 1000 - self._RETRIEVE_LIMIT = 1000 - self._DELETE_LIMIT = 1000 - self._UPDATE_LIMIT = 1000 - - def _init_http_clients(self) -> None: - session = get_global_requests_session() - self._http_client = HTTPClient( - config=HTTPClientConfig( - status_codes_to_retry={429}, - backoff_factor=0.5, - max_backoff_seconds=global_config.max_retry_backoff, - max_retries_total=global_config.max_retries, - max_retries_read=0, - max_retries_connect=global_config.max_retries_connect, - max_retries_status=global_config.max_retries, - ), - session=session, - refresh_auth_header=self._refresh_auth_header, - ) - self._http_client_with_retry = HTTPClient( - config=HTTPClientConfig( - status_codes_to_retry=global_config.status_forcelist, - backoff_factor=0.5, - max_backoff_seconds=global_config.max_retry_backoff, - max_retries_total=global_config.max_retries, - max_retries_read=global_config.max_retries, - max_retries_connect=global_config.max_retries_connect, - max_retries_status=global_config.max_retries, - ), - session=session, - refresh_auth_header=self._refresh_auth_header, - ) - - def _delete( - self, url_path: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None - ) -> Response: - return self._do_request("DELETE", url_path, params=params, headers=headers, timeout=self._config.timeout) - - def _get( - self, url_path: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None - ) -> Response: - return self._do_request("GET", url_path, params=params, headers=headers, timeout=self._config.timeout) - - def _post( - self, - url_path: str, - json: dict[str, Any] | None = None, - params: dict[str, Any] | None = None, - headers: dict[str, Any] | None = None, - api_subversion: str | None = None, - ) -> Response: - return self._do_request( - "POST", - url_path, - json=json, - headers=headers, - params=params, - timeout=self._config.timeout, - api_subversion=api_subversion, - ) - - def _put( - self, url_path: str, json: dict[str, Any] | None = None, headers: dict[str, Any] | None = None - ) -> Response: - return self._do_request("PUT", url_path, json=json, headers=headers, timeout=self._config.timeout) - - def _do_request( - self, - method: str, - url_path: str, - accept: str = "application/json", - api_subversion: str | None = None, - **kwargs: Any, - ) -> Response: - is_retryable, full_url = self._resolve_url(method, url_path) - json_payload = kwargs.pop("json", None) - headers = self._configure_headers( - accept, - additional_headers=self._config.headers.copy(), - api_subversion=api_subversion, - ) - headers.update(kwargs.get("headers") or {}) - - if json_payload is not None: - try: - data = _json.dumps(json_payload, allow_nan=False) - except ValueError as e: - # A lot of work to give a more human friendly error message when nans and infs are present: - msg = "Out of range float values are not JSON compliant" - if msg in str(e): # exc. might e.g. contain an extra ": nan", depending on build (_json.make_encoder) - raise ValueError(f"{msg}. Make sure your data does not contain NaN(s) or +/- Inf!").with_traceback( - e.__traceback__ - ) from None - raise - kwargs["data"] = data - if method in ["PUT", "POST"] and not global_config.disable_gzip: - kwargs["data"] = gzip.compress(data.encode()) - headers["Content-Encoding"] = "gzip" - - kwargs["headers"] = headers - - # requests will by default follow redirects. This can be an SSRF-hazard if - # the client can be tricked to request something with an open redirect, in - # addition to leaking the token, as requests will send the headers to the - # redirected-to endpoint. - # If redirects are to be followed in a call, this should be opted into instead. - kwargs.setdefault("allow_redirects", False) - - if is_retryable: - res = self._http_client_with_retry.request(method=method, url=full_url, **kwargs) - else: - res = self._http_client.request(method=method, url=full_url, **kwargs) - - match res.status_code: - case 200 | 201 | 202 | 204: - pass - case 401: - self._raise_no_project_access_error(res) - case _: - self._raise_api_error(res, payload=json_payload) - - stream = kwargs.get("stream") - self._log_request(res, payload=json_payload, stream=stream) - return res - - def _configure_headers( - self, accept: str, additional_headers: dict[str, str], api_subversion: str | None = None - ) -> MutableMapping[str, Any]: - headers: MutableMapping[str, Any] = CaseInsensitiveDict() - headers.update(requests.utils.default_headers()) - self._refresh_auth_header(headers) - headers["content-type"] = "application/json" - headers["accept"] = accept - headers["x-cdp-sdk"] = f"CognitePythonSDK:{get_current_sdk_version()}" - headers["x-cdp-app"] = self._config.client_name - headers["cdf-version"] = api_subversion or self._api_subversion - if "User-Agent" in headers: - headers["User-Agent"] += f" {get_user_agent()}" - else: - headers["User-Agent"] = get_user_agent() - headers.update(additional_headers) - return headers - - def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None: - auth_header_name, auth_header_value = self._config.credentials.authorization_header() - headers[auth_header_name] = auth_header_value - def _retrieve( self, identifier: IdentifierCore, @@ -1219,108 +1046,3 @@ def _clear_all_attributes(update_attributes: list[PropertySpec]) -> dict[str, di continue cleared[to_camel_case(prop.name)] = clear_with return cleared - - def _raise_no_project_access_error(self, res: Response) -> NoReturn: - raise CogniteProjectAccessError( - client=self._cognite_client, - project=self._cognite_client._config.project, - x_request_id=res.headers.get("X-Request-Id"), - cluster=self._config.cdf_cluster, - ) - - def _raise_api_error(self, res: Response, payload: dict) -> NoReturn: - x_request_id = res.headers.get("X-Request-Id") - code = res.status_code - missing = None - duplicated = None - extra = {} - try: - error = res.json()["error"] - if isinstance(error, str): - msg = error - elif isinstance(error, dict): - msg = error["message"] - missing = error.get("missing") - duplicated = error.get("duplicated") - for k, v in error.items(): - if k not in ["message", "missing", "duplicated", "code"]: - extra[k] = v - else: - msg = res.content.decode() - except Exception: - msg = res.content.decode() - - error_details: dict[str, Any] = {"X-Request-ID": x_request_id} - if payload: - error_details["payload"] = payload - if missing: - error_details["missing"] = missing - if duplicated: - error_details["duplicated"] = duplicated - error_details["headers"] = res.request.headers.copy() - self._sanitize_headers(error_details["headers"]) - error_details["response_payload"] = shorten(self._get_response_content_safe(res), 500) - error_details["response_headers"] = res.headers - - if res.history: - for res_hist in res.history: - logger.debug( - f"REDIRECT AFTER HTTP Error {res_hist.status_code} {res_hist.request.method} {res_hist.request.url}: {res_hist.content.decode()}" - ) - logger.debug(f"HTTP Error {code} {res.request.method} {res.request.url}: {msg}", extra=error_details) - raise CogniteAPIError( - msg, - code, - x_request_id, - missing=missing, - duplicated=duplicated, - extra=extra, - cluster=self._config.cdf_cluster, - ) - - def _log_request(self, res: Response, **kwargs: Any) -> None: - method = res.request.method - url = res.request.url - status_code = res.status_code - - extra = kwargs.copy() - extra["headers"] = res.request.headers.copy() - self._sanitize_headers(extra["headers"]) - if extra["payload"] is None: - del extra["payload"] - - stream = kwargs.get("stream") - if not stream and self._config.debug is True: - extra["response_payload"] = shorten(self._get_response_content_safe(res), 500) - extra["response_headers"] = res.headers - - try: - http_protocol = f"HTTP/{'.'.join(str(res.raw.version))}" - except AttributeError: - # If this fails, it means we are running in a browser (pyodide) with patched requests package: - http_protocol = "XMLHTTP" - - logger.debug(f"{http_protocol} {method} {url} {status_code}", extra=extra) - - @staticmethod - def _get_response_content_safe(res: Response) -> str: - try: - return _json.dumps(res.json()) - except (JSONDecodeError, RequestsJSONDecodeError): - pass - - try: - return res.content.decode() - except UnicodeDecodeError: - pass - - return "" - - @staticmethod - def _sanitize_headers(headers: dict[str, Any] | None) -> None: - if headers is None: - return None - if "api-key" in headers: - headers["api-key"] = "***" - if "Authorization" in headers: - headers["Authorization"] = "***" diff --git a/cognite/client/_basic_api_client.py b/cognite/client/_basic_api_client.py new file mode 100644 index 000000000..ffea536fd --- /dev/null +++ b/cognite/client/_basic_api_client.py @@ -0,0 +1,289 @@ +from __future__ import annotations + +import gzip +import logging +from collections.abc import MutableMapping +from typing import TYPE_CHECKING, Any, NoReturn + +import httpx + +from cognite.client._http_client import HTTPClientWithRetry, HTTPClientWithRetryConfig +from cognite.client.config import global_config +from cognite.client.exceptions import CogniteAPIError, CogniteProjectAccessError +from cognite.client.utils import _json +from cognite.client.utils._auxiliary import get_current_sdk_version, get_user_agent +from cognite.client.utils._text import shorten +from cognite.client.utils._url import resolve_url + +if TYPE_CHECKING: + from cognite.client._cognite_client import CogniteClient + from cognite.client.config import ClientConfig + + +logger = logging.getLogger(__name__) + +try: + from httpx._client import USER_AGENT +except ImportError: + USER_AGENT = "python-httpx/" + + +class BasicAPIClient: + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + self._config = config + self._api_version = api_version + self._api_subversion = config.api_subversion + self._cognite_client = cognite_client + self._init_http_clients() + + self._CREATE_LIMIT = 1000 + self._LIST_LIMIT = 1000 + self._RETRIEVE_LIMIT = 1000 + self._DELETE_LIMIT = 1000 + self._UPDATE_LIMIT = 1000 + + def _init_http_clients(self) -> None: + self._http_client = HTTPClientWithRetry( + config=HTTPClientWithRetryConfig(status_codes_to_retry={429}, max_retries_read=0), + refresh_auth_header=self._refresh_auth_header, + ) + self._http_client_with_retry = HTTPClientWithRetry( + config=HTTPClientWithRetryConfig(), + refresh_auth_header=self._refresh_auth_header, + ) + + def _delete( + self, url_path: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None + ) -> httpx.Response: + return self._do_request("DELETE", url_path, params=params, headers=headers, timeout=self._config.timeout) + + def _get( + self, + url_path: str, + params: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + follow_redirects=False, + api_subversion: str | None = None, + ) -> httpx.Response: + # full_url = resolve_url(url_path, self._api_version, self._config) + full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) + try: + res = self._http_client_with_retry.get( + full_url, + params=params, + headers=full_headers, + follow_redirects=follow_redirects, + timeout=self._config.timeout, + ) + except httpx.HTTPStatusError as err: + self._handle_status_error(err) + + self._log_request(res, payload=None, stream=None) + return res + + def _post( + self, + url_path: str, + json: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + api_subversion: str | None = None, + ) -> httpx.Response: + return self._do_request( + "POST", + url_path, + json=json, + headers=headers, + params=params, + timeout=self._config.timeout, + api_subversion=api_subversion, + ) + + def _put( + self, url_path: str, json: dict[str, Any] | None = None, headers: dict[str, Any] | None = None + ) -> httpx.Response: + return self._do_request("PUT", url_path, json=json, headers=headers, timeout=self._config.timeout) + + def _configure_headers( + self, additional_headers: dict[str, str] | None, api_subversion: str | None + ) -> dict[str, str]: + headers = { + "content-type": "application/json", + "accept": "*/*", + "x-cdp-sdk": "CognitePythonSDK:" + get_current_sdk_version(), + "x-cdp-app": self._config.client_name, + "cdf-version": api_subversion or self._api_subversion, + "user-agent": USER_AGENT + get_user_agent(), + **self._config.headers, + **(additional_headers or {}), + } + self._refresh_auth_header(headers) + return headers + + def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None: + auth_header_name, auth_header_value = self._config.credentials.authorization_header() + headers[auth_header_name] = auth_header_value + + def _prepare_request( + self, + method: str, + url_path: str, + api_subversion: str | None = None, + json: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + ) -> Any: + is_retryable, full_url = resolve_url(method, url_path, self._api_version, self._config) + full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) + if json is not None: + try: + data = _json.dumps(json, allow_nan=False) + except ValueError as e: + # A lot of work to give a more human friendly error message when nans and infs are present: + msg = "Out of range float values are not JSON compliant" + if msg in str(e): # exc. might e.g. contain an extra ": nan", depending on build (_json.make_encoder) + raise ValueError(f"{msg}. Make sure your data does not contain NaN(s) or +/- Inf!").with_traceback( + e.__traceback__ + ) from None + raise + + if method in ("PUT", "POST") and not global_config.disable_gzip: + data = gzip.compress(data.encode()) + full_headers["Content-Encoding"] = "gzip" + + return full_url, full_headers, data, is_retryable + + # if is_retryable: + # res = self._http_client_with_retry.request(method=method, url=full_url, **kwargs) + # else: + # res = self._http_client.request(method=method, url=full_url, **kwargs) + + # match res.status_code: + # case 200 | 201 | 202 | 204: + # pass + # case 401: + # self._raise_no_project_access_error(res) + # case _: + # self._raise_api_error(res, payload=json_payload) + + # stream = kwargs.get("stream") + # self._log_request(res, payload=json_payload, stream=stream) + # return res + + # def _inspect_response( + # self, + # response: httpx.Response, + # json_payload: dict[str, Any] | None = None, + # stream: Any = None, + # ) -> None: + # if not self._status_ok(response.status_code): + # self._raise_api_error(response, payload=json_payload) + # self._log_request(response, payload=json_payload, stream=stream) + # return response + + def _handle_status_error(self, error: httpx.HTTPStatusError, payload: dict | None = None) -> NoReturn: + # The response had an error HTTP status of 4xx or 5xx: + match error.response.status_code: + case 401: + self._raise_no_project_access_error(error.response) + case _: + self._raise_api_error(error, payload) + + def _raise_no_project_access_error(self, response: httpx.Response) -> NoReturn: + raise CogniteProjectAccessError( + client=self._cognite_client, + project=self._cognite_client._config.project, + x_request_id=response.headers.get("x-request-id"), + cluster=self._config.cdf_cluster, + ) + + def _raise_api_error(self, err: httpx.HTTPStatusError, payload: dict | None = None) -> NoReturn: + response, request = err.response, err.request + x_request_id = response.headers.get("X-Request-Id") + code, extra, missing, duplicated = response.status_code, {}, None, None + try: + match error := response.json()["error"]: + case str(): + msg = error + case dict(): + extra = error.copy() + msg = extra.pop("message") + missing = extra.pop("missing", None) + duplicated = extra.pop("duplicated", None) + case _: + msg = response.content.decode() + except KeyError | _json.JSONDecodeError: + msg = response.content.decode() + + error_details: dict[str, Any] = { + "X-Request-ID": x_request_id, + "headers": self._sanitize_headers(request.headers), + "response_payload": shorten(self._get_response_content_safe(response), 1000), + "response_headers": response.headers, + } + if payload: + error_details["payload"] = payload + if missing: + error_details["missing"] = missing + if duplicated: + error_details["duplicated"] = duplicated + + if response.history: + for res_hist in response.history: + logger.debug( + f"REDIRECT AFTER HTTP Error {res_hist.status_code} {res_hist.request.method} " + f"{res_hist.request.url}: {res_hist.content.decode()}" + ) + logger.debug(f"HTTP Error {code} {res.request.method} {res.request.url}: {msg}", extra=error_details) + raise CogniteAPIError( + msg, + code, + x_request_id, + missing=missing, + duplicated=duplicated, + extra=extra, + cluster=self._config.cdf_cluster, + ) from err + + def _log_request(self, res: httpx.Response, **kwargs: Any) -> None: + method = res.request.method + url = res.request.url + status_code = res.status_code + + extra = kwargs.copy() + extra["headers"] = res.request.headers.copy() + self._sanitize_headers(extra["headers"]) + if extra["payload"] is None: + del extra["payload"] + + stream = kwargs.get("stream") + if not stream and self._config.debug is True: + extra["response_payload"] = shorten(self._get_response_content_safe(res), 500) + extra["response_headers"] = res.headers + + try: + http_protocol = res.http_version + except AttributeError: + # If this fails, it means we are running in a browser (pyodide) with patched requests package: + http_protocol = "XMLHTTP" + + logger.debug(f"{http_protocol} {method} {url} {status_code}", extra=extra) + + @staticmethod + def _get_response_content_safe(res: httpx.Response) -> str: + try: + return _json.dumps(res.json()) + except _json.JSONDecodeError: + pass + + try: + return res.content.decode() + except UnicodeDecodeError: + pass + + return "" + + @staticmethod + def _sanitize_headers(headers: httpx.Headers | None) -> httpx.Headers | None: + if headers and "Authorization" in headers: + headers["Authorization"] = "***" + return headers From 38970a5aeb6a87360de769b114ac2cb695d56708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 13 Jan 2025 14:08:45 +0100 Subject: [PATCH 6/7] TMP --- cognite/client/_api/data_modeling/graphql.py | 2 +- cognite/client/_api/datapoints.py | 10 +- cognite/client/_api/documents.py | 2 +- cognite/client/_api/files.py | 23 +- cognite/client/_api/geospatial.py | 1 + .../client/_api/postgres_gateway/tables.py | 2 +- cognite/client/_api/raw.py | 2 +- cognite/client/_api/templates.py | 2 +- cognite/client/_api/three_d.py | 3 +- cognite/client/_api/transformations/jobs.py | 2 +- cognite/client/_api/transformations/schema.py | 2 +- cognite/client/_api/workflows.py | 3 +- cognite/client/_api_client.py | 7 +- cognite/client/_basic_api_client.py | 316 ++++++++------ cognite/client/_cognite_client.py | 35 +- cognite/client/_http_client.py | 391 ++++++++++-------- cognite/client/config.py | 2 + cognite/client/exceptions.py | 23 +- cognite/client/utils/_auxiliary.py | 35 +- cognite/client/utils/_concurrency.py | 1 + cognite/client/utils/_json.py | 13 + cognite/client/utils/_pyodide_helpers.py | 49 +-- cognite/client/utils/_url.py | 13 +- cognite/client/utils/_version_checker.py | 4 +- tests/tests_unit/test_http_client.py | 42 +- 25 files changed, 542 insertions(+), 443 deletions(-) diff --git a/cognite/client/_api/data_modeling/graphql.py b/cognite/client/_api/data_modeling/graphql.py index c1bbc41d9..7060043d6 100644 --- a/cognite/client/_api/data_modeling/graphql.py +++ b/cognite/client/_api/data_modeling/graphql.py @@ -8,7 +8,7 @@ from cognite.client.data_classes.data_modeling.graphql import DMLApplyResult from cognite.client.data_classes.data_modeling.ids import DataModelId from cognite.client.exceptions import CogniteGraphQLError, GraphQLErrorSpec -from cognite.client.utils._auxiliary import interpolate_and_url_encode +from cognite.client.utils._url import interpolate_and_url_encode class DataModelingGraphQLAPI(APIClient): diff --git a/cognite/client/_api/datapoints.py b/cognite/client/_api/datapoints.py index 6507e07e9..df490699f 100644 --- a/cognite/client/_api/datapoints.py +++ b/cognite/client/_api/datapoints.py @@ -119,12 +119,10 @@ def fetch_all_datapoints_numpy(self) -> DatapointsArrayList: def _request_datapoints(self, payload: _DatapointsPayload) -> Sequence[DataPointListItem]: (res := DataPointListResponse()).MergeFromString( - self.dps_client._do_request( - json=payload, - method="POST", - url_path=f"{self.dps_client._RESOURCE_PATH}/list", - accept="application/protobuf", - timeout=self.dps_client._config.timeout, + self.dps_client._post( + f"{self.dps_client._RESOURCE_PATH}/list", + json=payload, # type: ignore [arg-type] + headers={"accept": "application/protobuf"}, ).content ) return res.items diff --git a/cognite/client/_api/documents.py b/cognite/client/_api/documents.py index 1c367c5c3..506cb4b5f 100644 --- a/cognite/client/_api/documents.py +++ b/cognite/client/_api/documents.py @@ -506,7 +506,6 @@ def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None: in order to reduce the size of the returned payload. If you want the whole text for a document, you can use this endpoint. - Args: id (int): The server-generated ID for the document you want to retrieve the content of. buffer (BinaryIO): The document content is streamed directly into the buffer. This is useful for retrieving large documents. @@ -521,6 +520,7 @@ def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None: >>> with Path("my_file.txt").open("wb") as buffer: ... client.documents.retrieve_content_buffer(id=123, buffer=buffer) """ + # TODO: Needs an httpx checkup, stream, iter_content, etc. with self._do_request( "GET", f"{self._RESOURCE_PATH}/{id}/content", stream=True, accept="text/plain" ) as response: diff --git a/cognite/client/_api/files.py b/cognite/client/_api/files.py index 83aaf1d32..00698abbb 100644 --- a/cognite/client/_api/files.py +++ b/cognite/client/_api/files.py @@ -634,21 +634,20 @@ def upload_content_bytes( return self._upload_bytes(content, res.json()["items"][0]) - def _upload_bytes(self, content: bytes | TextIO | BinaryIO, returned_file_metadata: dict) -> FileMetadata: + def _upload_bytes(self, content: bytes | BinaryIO, returned_file_metadata: dict) -> FileMetadata: upload_url = returned_file_metadata["uploadUrl"] if urlparse(upload_url).netloc: full_upload_url = upload_url else: full_upload_url = urljoin(self._config.base_url, upload_url) file_metadata = FileMetadata._load(returned_file_metadata) - upload_response = self._http_client_with_retry.request( - "PUT", + upload_response = self._put( full_upload_url, - data=content, - timeout=self._config.file_transfer_timeout, + content=content, headers={"Content-Type": file_metadata.mime_type, "accept": "*/*"}, + timeout=self._config.file_transfer_timeout, ) - if not upload_response.ok: + if not upload_response.is_success: raise CogniteFileUploadError(message=upload_response.text, code=upload_response.status_code) return file_metadata @@ -1125,8 +1124,9 @@ def _process_file_download( self._download_file_to_path(download_link, file_path_absolute) def _download_file_to_path(self, download_link: str, path: Path, chunk_size: int = 2**21) -> None: - with self._http_client_with_retry.request( - "GET", download_link, headers={"accept": "*/*"}, stream=True, timeout=self._config.file_transfer_timeout + # TODO: Needs an httpx checkup, stream, iter_content, etc. + with self._stream( + "GET", download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout ) as r: with path.open("wb") as f: for chunk in r.iter_content(chunk_size=chunk_size): @@ -1185,10 +1185,9 @@ def download_bytes( return self._download_file(download_link) def _download_file(self, download_link: str) -> bytes: - res = self._http_client_with_retry.request( - "GET", download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout - ) - return res.content + return self._request( + "GET", full_url=download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout + ).content def list( self, diff --git a/cognite/client/_api/geospatial.py b/cognite/client/_api/geospatial.py index b83273783..1385c7da9 100644 --- a/cognite/client/_api/geospatial.py +++ b/cognite/client/_api/geospatial.py @@ -708,6 +708,7 @@ def stream_features( "allowCrsTransformation": allow_crs_transformation, "allowDimensionalityMismatch": allow_dimensionality_mismatch, } + # TODO: TODO: Needs an httpx checkup due to stream=True res = self._do_request("POST", url_path=resource_path, json=payload, timeout=self._config.timeout, stream=True) try: diff --git a/cognite/client/_api/postgres_gateway/tables.py b/cognite/client/_api/postgres_gateway/tables.py index f7ddb3d40..38b2bac67 100644 --- a/cognite/client/_api/postgres_gateway/tables.py +++ b/cognite/client/_api/postgres_gateway/tables.py @@ -6,9 +6,9 @@ import cognite.client.data_classes.postgres_gateway.tables as pg from cognite.client._api_client import APIClient from cognite.client._constants import DEFAULT_LIMIT_READ -from cognite.client.utils._auxiliary import interpolate_and_url_encode from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._identifier import TablenameSequence +from cognite.client.utils._url import interpolate_and_url_encode from cognite.client.utils.useful_types import SequenceNotStr if TYPE_CHECKING: diff --git a/cognite/client/_api/raw.py b/cognite/client/_api/raw.py index 3c3063bb1..ad6d40475 100644 --- a/cognite/client/_api/raw.py +++ b/cognite/client/_api/raw.py @@ -14,7 +14,6 @@ from cognite.client.data_classes.raw import Database, DatabaseList, Row, RowCore, RowList, RowWrite from cognite.client.utils._auxiliary import ( find_duplicates, - interpolate_and_url_encode, is_finite, is_unlimited, split_into_chunks, @@ -23,6 +22,7 @@ from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks from cognite.client.utils._identifier import Identifier from cognite.client.utils._importing import local_import +from cognite.client.utils._url import interpolate_and_url_encode from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr diff --git a/cognite/client/_api/templates.py b/cognite/client/_api/templates.py index 598d240e2..46e12ce06 100644 --- a/cognite/client/_api/templates.py +++ b/cognite/client/_api/templates.py @@ -23,8 +23,8 @@ ViewResolveList, ViewWrite, ) -from cognite.client.utils._auxiliary import interpolate_and_url_encode from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils._url import interpolate_and_url_encode from cognite.client.utils.useful_types import SequenceNotStr if TYPE_CHECKING: diff --git a/cognite/client/_api/three_d.py b/cognite/client/_api/three_d.py index 09b6725d4..e2af42423 100644 --- a/cognite/client/_api/three_d.py +++ b/cognite/client/_api/three_d.py @@ -22,9 +22,10 @@ ThreeDNodeList, ) from cognite.client.utils import _json -from cognite.client.utils._auxiliary import interpolate_and_url_encode, split_into_chunks, unpack_items_in_payload +from cognite.client.utils._auxiliary import split_into_chunks, unpack_items_in_payload from cognite.client.utils._concurrency import execute_tasks from cognite.client.utils._identifier import IdentifierSequence, InternalId +from cognite.client.utils._url import interpolate_and_url_encode from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr diff --git a/cognite/client/_api/transformations/jobs.py b/cognite/client/_api/transformations/jobs.py index 536db7dbd..3669dbe39 100644 --- a/cognite/client/_api/transformations/jobs.py +++ b/cognite/client/_api/transformations/jobs.py @@ -11,8 +11,8 @@ TransformationJobMetric, TransformationJobMetricList, ) -from cognite.client.utils._auxiliary import interpolate_and_url_encode from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils._url import interpolate_and_url_encode class TransformationJobsAPI(APIClient): diff --git a/cognite/client/_api/transformations/schema.py b/cognite/client/_api/transformations/schema.py index 26658e0de..1152f2db5 100644 --- a/cognite/client/_api/transformations/schema.py +++ b/cognite/client/_api/transformations/schema.py @@ -6,7 +6,7 @@ TransformationSchemaColumn, TransformationSchemaColumnList, ) -from cognite.client.utils._auxiliary import interpolate_and_url_encode +from cognite.client.utils._url import interpolate_and_url_encode class TransformationSchemaAPI(APIClient): diff --git a/cognite/client/_api/workflows.py b/cognite/client/_api/workflows.py index 64ffc023c..dfa9e5d9b 100644 --- a/cognite/client/_api/workflows.py +++ b/cognite/client/_api/workflows.py @@ -28,13 +28,14 @@ WorkflowVersionUpsert, ) from cognite.client.exceptions import CogniteAPIError -from cognite.client.utils._auxiliary import at_least_one_is_not_none, interpolate_and_url_encode, split_into_chunks +from cognite.client.utils._auxiliary import at_least_one_is_not_none, split_into_chunks from cognite.client.utils._concurrency import execute_tasks from cognite.client.utils._identifier import ( IdentifierSequence, WorkflowVersionIdentifierSequence, ) from cognite.client.utils._session import create_session_and_return_nonce +from cognite.client.utils._url import interpolate_and_url_encode from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index cdd03b0ab..06155638f 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -7,7 +7,6 @@ from collections import UserList from collections.abc import Iterator, Mapping, Sequence from typing import ( - TYPE_CHECKING, Any, Literal, TypeVar, @@ -32,7 +31,6 @@ from cognite.client.data_classes.filters import Filter from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError from cognite.client.utils._auxiliary import ( - interpolate_and_url_encode, is_unlimited, split_into_chunks, unpack_items, @@ -47,6 +45,7 @@ SingletonIdentifierSequence, ) from cognite.client.utils._text import convert_all_keys_to_camel_case, to_camel_case, to_snake_case +from cognite.client.utils._url import interpolate_and_url_encode from cognite.client.utils._validation import assert_type, verify_limit from cognite.client.utils.useful_types import SequenceNotStr @@ -866,7 +865,7 @@ def _upsert_multiple( ) except CogniteNotFoundError as not_found_error: items_by_external_id = {item.external_id: item for item in items if item.external_id is not None} # type: ignore [attr-defined] - items_by_id = {item.id: item for item in items if hasattr(item, "id") and item.id is not None} + items_by_id = {item.id: item for item in items if getattr(item, "id", None) is not None} # Not found must have an external id as they do not exist in CDF: try: missing_external_ids = {entry["externalId"] for entry in not_found_error.not_found} @@ -944,7 +943,7 @@ def _upsert_multiple( # Reorder to match the order of the input items result.data = [ result.get( - **Identifier.load(item.id if hasattr(item, "id") else None, item.external_id).as_dict( # type: ignore [attr-defined] + **Identifier.load(getattr(item, "id", None), item.external_id).as_dict( # type: ignore [attr-defined] camel_case=False ) ) diff --git a/cognite/client/_basic_api_client.py b/cognite/client/_basic_api_client.py index ffea536fd..bfccf1170 100644 --- a/cognite/client/_basic_api_client.py +++ b/cognite/client/_basic_api_client.py @@ -1,9 +1,11 @@ from __future__ import annotations +import functools import gzip import logging -from collections.abc import MutableMapping -from typing import TYPE_CHECKING, Any, NoReturn +import platform +from collections.abc import Iterable, Iterator, MutableMapping +from typing import TYPE_CHECKING, Any, Literal, NoReturn import httpx @@ -11,7 +13,6 @@ from cognite.client.config import global_config from cognite.client.exceptions import CogniteAPIError, CogniteProjectAccessError from cognite.client.utils import _json -from cognite.client.utils._auxiliary import get_current_sdk_version, get_user_agent from cognite.client.utils._text import shorten from cognite.client.utils._url import resolve_url @@ -22,10 +23,37 @@ logger = logging.getLogger(__name__) -try: - from httpx._client import USER_AGENT -except ImportError: - USER_AGENT = "python-httpx/" + +def handle_json_dump(json: dict[str, Any] | None, full_headers: MutableMapping[str, str]) -> bytes | str | None: + if json is None: + return None + + content = _json.dumps_no_nan_or_inf(json) + if global_config.disable_gzip: + return content + + full_headers["Content-Encoding"] = "gzip" + return gzip.compress(content.encode()) + + +@functools.cache +def get_user_agent() -> str: + from cognite.client import __version__ + + try: + from httpx._client import USER_AGENT + except ImportError: + USER_AGENT = "python-httpx/" + + sdk_version = f"CognitePythonSDK/{__version__}" + python_version = ( + f"{platform.python_implementation()}/{platform.python_version()} " + f"({platform.python_build()};{platform.python_compiler()})" + ) + os_version_info = [platform.release(), platform.machine(), platform.architecture()[0]] + operating_system = f"{platform.system()}/{'-'.join(s for s in os_version_info if s)}" + + return f"{USER_AGENT} {sdk_version} {python_version} {operating_system}" class BasicAPIClient: @@ -52,23 +80,58 @@ def _init_http_clients(self) -> None: refresh_auth_header=self._refresh_auth_header, ) - def _delete( - self, url_path: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None + def select_http_client(self, is_retryable: bool) -> HTTPClientWithRetry: + return self._http_client_with_retry if is_retryable else self._http_client + + def _request( + self, + method: str, + /, + full_url: str, + headers: dict[str, Any] | None = None, + timeout: float | None = None, ) -> httpx.Response: - return self._do_request("DELETE", url_path, params=params, headers=headers, timeout=self._config.timeout) + """Make a request to something that is outside Cognite Data Fusion""" + client = self.select_http_client(method in {"GET", "PUT", "HEAD"}) + try: + res = client("GET", full_url, headers=headers, timeout=timeout or self._config.timeout) + except httpx.HTTPStatusError as err: + self._handle_status_error(err) + + self._log_successful_request(res) + return res + + def _stream( + self, + method: Literal["GET", "POST"], + /, + full_url: str, + headers: dict[str, Any] | None = None, + timeout: float | None = None, + ) -> Iterator[httpx.Response]: + try: + res = self._http_client_with_retry.stream( + method, full_url, headers=headers, timeout=timeout or self._config.timeout + ) + except httpx.HTTPStatusError as err: + self._handle_status_error(err) + + self._log_successful_request(res, stream=True) + return res def _get( self, url_path: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - follow_redirects=False, + follow_redirects: bool = False, api_subversion: str | None = None, ) -> httpx.Response: - # full_url = resolve_url(url_path, self._api_version, self._config) + _, full_url = resolve_url("GET", url_path, self._api_version, self._config) full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) try: - res = self._http_client_with_retry.get( + res = self._http_client_with_retry( + "GET", full_url, params=params, headers=full_headers, @@ -78,7 +141,7 @@ def _get( except httpx.HTTPStatusError as err: self._handle_status_error(err) - self._log_request(res, payload=None, stream=None) + self._log_successful_request(res) return res def _post( @@ -87,33 +150,75 @@ def _post( json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, + follow_redirects: bool = False, api_subversion: str | None = None, ) -> httpx.Response: - return self._do_request( - "POST", - url_path, - json=json, - headers=headers, - params=params, - timeout=self._config.timeout, - api_subversion=api_subversion, - ) + is_retryable, full_url = resolve_url("POST", url_path, self._api_version, self._config) + full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) + # We want to control json dumping, so we pass it along to httpx.Client.post as 'content' + content = handle_json_dump(json, full_headers) + + http_client = self.select_http_client(is_retryable) + try: + res = http_client( + "POST", + full_url, + content=content, + params=params, + headers=full_headers, + follow_redirects=follow_redirects, + timeout=self._config.timeout, + ) + except httpx.HTTPStatusError as err: + self._handle_status_error(err) + + self._log_successful_request(res, payload=json) + return res def _put( - self, url_path: str, json: dict[str, Any] | None = None, headers: dict[str, Any] | None = None + self, + url_path: str, + content: str | bytes | Iterable[bytes] | None = None, + json: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + follow_redirects: bool = False, + api_subversion: str | None = None, + timeout: float | None = None, ) -> httpx.Response: - return self._do_request("PUT", url_path, json=json, headers=headers, timeout=self._config.timeout) + is_retryable, full_url = resolve_url("PUT", url_path, self._api_version, self._config) + full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) + if content is None: + content = handle_json_dump(json, full_headers) + + try: + res = self._http_client_with_retry( + "PUT", + full_url, + content=content, + params=params, + headers=full_headers, + follow_redirects=follow_redirects, + timeout=timeout or self._config.timeout, + ) + except httpx.HTTPStatusError as err: + self._handle_status_error(err) + + self._log_successful_request(res, payload=json) + return res def _configure_headers( self, additional_headers: dict[str, str] | None, api_subversion: str | None ) -> dict[str, str]: + from cognite.client import __version__ + headers = { "content-type": "application/json", "accept": "*/*", - "x-cdp-sdk": "CognitePythonSDK:" + get_current_sdk_version(), + "x-cdp-sdk": "CognitePythonSDK:" + __version__, "x-cdp-app": self._config.client_name, "cdf-version": api_subversion or self._api_subversion, - "user-agent": USER_AGENT + get_user_agent(), + "user-agent": get_user_agent(), **self._config.headers, **(additional_headers or {}), } @@ -124,82 +229,41 @@ def _refresh_auth_header(self, headers: MutableMapping[str, Any]) -> None: auth_header_name, auth_header_value = self._config.credentials.authorization_header() headers[auth_header_name] = auth_header_value - def _prepare_request( - self, - method: str, - url_path: str, - api_subversion: str | None = None, - json: dict[str, Any] | None = None, - headers: dict[str, Any] | None = None, - ) -> Any: - is_retryable, full_url = resolve_url(method, url_path, self._api_version, self._config) - full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) - if json is not None: - try: - data = _json.dumps(json, allow_nan=False) - except ValueError as e: - # A lot of work to give a more human friendly error message when nans and infs are present: - msg = "Out of range float values are not JSON compliant" - if msg in str(e): # exc. might e.g. contain an extra ": nan", depending on build (_json.make_encoder) - raise ValueError(f"{msg}. Make sure your data does not contain NaN(s) or +/- Inf!").with_traceback( - e.__traceback__ - ) from None - raise - - if method in ("PUT", "POST") and not global_config.disable_gzip: - data = gzip.compress(data.encode()) - full_headers["Content-Encoding"] = "gzip" - - return full_url, full_headers, data, is_retryable - - # if is_retryable: - # res = self._http_client_with_retry.request(method=method, url=full_url, **kwargs) - # else: - # res = self._http_client.request(method=method, url=full_url, **kwargs) - - # match res.status_code: - # case 200 | 201 | 202 | 204: - # pass - # case 401: - # self._raise_no_project_access_error(res) - # case _: - # self._raise_api_error(res, payload=json_payload) - - # stream = kwargs.get("stream") - # self._log_request(res, payload=json_payload, stream=stream) - # return res - - # def _inspect_response( - # self, - # response: httpx.Response, - # json_payload: dict[str, Any] | None = None, - # stream: Any = None, - # ) -> None: - # if not self._status_ok(response.status_code): - # self._raise_api_error(response, payload=json_payload) - # self._log_request(response, payload=json_payload, stream=stream) - # return response - def _handle_status_error(self, error: httpx.HTTPStatusError, payload: dict | None = None) -> NoReturn: # The response had an error HTTP status of 4xx or 5xx: match error.response.status_code: case 401: - self._raise_no_project_access_error(error.response) + self._raise_no_project_access_error(error, payload) case _: self._raise_api_error(error, payload) - def _raise_no_project_access_error(self, response: httpx.Response) -> NoReturn: + def _raise_no_project_access_error(self, err: httpx.HTTPStatusError, payload: dict | None = None) -> NoReturn: + self._log_failed_request(err, *self._extract_error_details(err), payload) raise CogniteProjectAccessError( client=self._cognite_client, project=self._cognite_client._config.project, - x_request_id=response.headers.get("x-request-id"), + x_request_id=err.response.headers.get("x-request-id"), cluster=self._config.cdf_cluster, ) def _raise_api_error(self, err: httpx.HTTPStatusError, payload: dict | None = None) -> NoReturn: + msg, error_details, missing, duplicated = self._extract_error_details(err) + self._log_failed_request(err, msg, error_details, missing, duplicated, payload) + raise CogniteAPIError( + msg, + code=err.response.status_code, + x_request_id=error_details.get("x-request-id"), + missing=missing, + duplicated=duplicated, + extra=error_details, + cluster=self._config.cdf_cluster, + ) from err + + def _extract_error_details( + self, err: httpx.HTTPStatusError + ) -> tuple[str, dict[str, Any], list[str] | None, list[str] | None]: response, request = err.response, err.request - x_request_id = response.headers.get("X-Request-Id") - code, extra, missing, duplicated = response.status_code, {}, None, None + extra, missing, duplicated = {}, None, None try: match error := response.json()["error"]: case str(): @@ -210,16 +274,27 @@ def _raise_api_error(self, err: httpx.HTTPStatusError, payload: dict | None = No missing = extra.pop("missing", None) duplicated = extra.pop("duplicated", None) case _: - msg = response.content.decode() - except KeyError | _json.JSONDecodeError: - msg = response.content.decode() + msg = response.text + except (KeyError, _json.JSONDecodeError): + msg = response.text error_details: dict[str, Any] = { - "X-Request-ID": x_request_id, + "x-request-id": response.headers.get("x-request-id"), "headers": self._sanitize_headers(request.headers), "response_payload": shorten(self._get_response_content_safe(response), 1000), "response_headers": response.headers, } + return msg, error_details, missing, duplicated + + @staticmethod + def _log_failed_request( + err: httpx.HTTPStatusError, + msg: str, + error_details: dict[str, Any], + missing: list[str] | None, + duplicated: list[str] | None, + payload: dict | None = None, + ) -> None: if payload: error_details["payload"] = payload if missing: @@ -227,60 +302,43 @@ def _raise_api_error(self, err: httpx.HTTPStatusError, payload: dict | None = No if duplicated: error_details["duplicated"] = duplicated + response, request = err.response, err.request if response.history: for res_hist in response.history: logger.debug( f"REDIRECT AFTER HTTP Error {res_hist.status_code} {res_hist.request.method} " f"{res_hist.request.url}: {res_hist.content.decode()}" ) - logger.debug(f"HTTP Error {code} {res.request.method} {res.request.url}: {msg}", extra=error_details) - raise CogniteAPIError( - msg, - code, - x_request_id, - missing=missing, - duplicated=duplicated, - extra=extra, - cluster=self._config.cdf_cluster, - ) from err - - def _log_request(self, res: httpx.Response, **kwargs: Any) -> None: - method = res.request.method - url = res.request.url - status_code = res.status_code - - extra = kwargs.copy() - extra["headers"] = res.request.headers.copy() - self._sanitize_headers(extra["headers"]) - if extra["payload"] is None: - del extra["payload"] - - stream = kwargs.get("stream") - if not stream and self._config.debug is True: - extra["response_payload"] = shorten(self._get_response_content_safe(res), 500) - extra["response_headers"] = res.headers - + logger.debug(f"HTTP Error {response.status_code} {request.method} {request.url}: {msg}", extra=error_details) + + def _log_successful_request( + self, res: httpx.Response, payload: dict[str, Any] | None = None, stream: bool = False + ) -> None: + extra: dict[str, Any] = { + "headers": self._sanitize_headers(res.request.headers.copy()), + "response_headers": res.headers, + } + if payload: + extra["payload"] = payload + if not stream and self._config.debug: + extra["response_payload"] = shorten(self._get_response_content_safe(res), 1_000) try: http_protocol = res.http_version except AttributeError: - # If this fails, it means we are running in a browser (pyodide) with patched requests package: + # If this fails, it prob. means we are running in a browser (pyodide) with patched httpx package: http_protocol = "XMLHTTP" - logger.debug(f"{http_protocol} {method} {url} {status_code}", extra=extra) + logger.debug(f"{http_protocol} {res.request.method} {res.url} {res.status_code}", extra=extra) @staticmethod def _get_response_content_safe(res: httpx.Response) -> str: try: return _json.dumps(res.json()) except _json.JSONDecodeError: - pass - - try: - return res.content.decode() - except UnicodeDecodeError: - pass - - return "" + try: + return res.text + except UnicodeDecodeError: + return "" @staticmethod def _sanitize_headers(headers: httpx.Headers | None) -> httpx.Headers | None: diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 9c9c0a410..c35a7f1a8 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -1,8 +1,6 @@ from __future__ import annotations -from typing import Any - -from requests import Response +from typing import TYPE_CHECKING, Any from cognite.client._api.annotations import AnnotationsAPI from cognite.client._api.assets import AssetsAPI @@ -34,7 +32,10 @@ from cognite.client._api_client import APIClient from cognite.client.config import ClientConfig, global_config from cognite.client.credentials import CredentialProvider, OAuthClientCredentials, OAuthInteractive -from cognite.client.utils._auxiliary import get_current_sdk_version, load_resource_to_dict +from cognite.client.utils._auxiliary import load_resource_to_dict + +if TYPE_CHECKING: + import httpx class CogniteClient: @@ -89,27 +90,31 @@ def __init__(self, config: ClientConfig | None = None) -> None: # APIs just using base_url: self._api_client = APIClient(self._config, api_version=None, cognite_client=self) - def get(self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None) -> Response: + def get( + self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None + ) -> httpx.Response: """Perform a GET request to an arbitrary path in the API.""" return self._api_client._get(url, params=params, headers=headers) def post( self, url: str, - json: dict[str, Any], + json: dict[str, Any] | None = None, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None, - ) -> Response: + ) -> httpx.Response: """Perform a POST request to an arbitrary path in the API.""" return self._api_client._post(url, json=json, params=params, headers=headers) - def put(self, url: str, json: dict[str, Any] | None = None, headers: dict[str, Any] | None = None) -> Response: + def put( + self, + url: str, + json: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + ) -> httpx.Response: """Perform a PUT request to an arbitrary path in the API.""" - return self._api_client._put(url, json=json, headers=headers) - - def delete(self, url: str, params: dict[str, Any] | None = None, headers: dict[str, Any] | None = None) -> Response: - """Perform a DELETE request to an arbitrary path in the API.""" - return self._api_client._delete(url, params=params, headers=headers) + return self._api_client._put(url, json=json, params=params, headers=headers) @property def version(self) -> str: @@ -118,7 +123,9 @@ def version(self) -> str: Returns: str: The current SDK version """ - return get_current_sdk_version() + from cognite.client import __version__ + + return __version__ @property def config(self) -> ClientConfig: diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index 1cec55e81..319cdc66d 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -2,225 +2,266 @@ import functools import random -import socket import time -from collections.abc import Callable, Iterable, MutableMapping -from http import cookiejar -from typing import Any, Literal +from collections.abc import AsyncIterable, Callable, Iterable, Iterator, Mapping, MutableMapping +from contextlib import suppress +from http.cookiejar import Cookie, CookieJar +from json import JSONDecodeError +from typing import ( + Any, + Literal, + TypeVar, +) +from venv import logger -import requests -import requests.adapters -import urllib3 +import httpx from cognite.client.config import global_config -from cognite.client.exceptions import CogniteConnectionError, CogniteConnectionRefused, CogniteReadTimeout -from cognite.client.utils.useful_types import SupportsRead +from cognite.client.exceptions import ( + CogniteConnectionError, + CogniteConnectionRefused, + CogniteReadTimeout, + CogniteRequestError, +) +_T = TypeVar("_T") -class BlockAll(cookiejar.CookiePolicy): - def no(*args: Any, **kwargs: Any) -> Literal[False]: - return False - return_ok = set_ok = domain_return_ok = path_return_ok = no - netscape = True - rfc2965 = hide_cookie2 = False +class NoCookiesPlease(CookieJar): + def set_cookie(self, cookie: Cookie) -> None: + pass -@functools.lru_cache(1) -def get_global_requests_session() -> requests.Session: - session = requests.Session() - session.cookies.set_policy(BlockAll()) - adapter = requests.adapters.HTTPAdapter( - pool_maxsize=global_config.max_connection_pool_size, max_retries=urllib3.Retry(False) +@functools.cache +def get_global_httpx_client() -> httpx.Client: + client = httpx.Client( + transport=httpx.HTTPTransport( + retries=0, + limits=httpx.Limits(max_connections=global_config.max_connection_pool_size), + ), + timeout=None, # httpx has strict-ish defaults, as opposed to requests. We want to specify per request + follow_redirects=global_config.allow_redirects, + cookies=NoCookiesPlease(), + verify=not global_config.disable_ssl, + proxies=global_config.proxies, # TODO: httpx has deprecated 'proxies' ) - session.mount("http://", adapter) - session.mount("https://", adapter) if global_config.disable_ssl: - urllib3.disable_warnings() - session.verify = False - if global_config.proxies is not None: - session.proxies.update(global_config.proxies) - return session + # TODO: httpx uses httpcore, not urllib3 -> figure out how to disable warnings (if any?) + pass + return client -class HTTPClientConfig: +class HTTPClientWithRetryConfig: def __init__( self, - status_codes_to_retry: set[int], - backoff_factor: float, - max_backoff_seconds: int, - max_retries_total: int, - max_retries_status: int, - max_retries_read: int, - max_retries_connect: int, + status_codes_to_retry: set[int] | None = None, + backoff_factor: float = 0.5, + max_backoff_seconds: int | None = None, + max_retries_total: int | None = None, + max_retries_status: int | None = None, + max_retries_read: int | None = None, + max_retries_connect: int | None = None, ) -> None: - self.status_codes_to_retry = status_codes_to_retry + self._status_codes_to_retry = status_codes_to_retry self.backoff_factor = backoff_factor - self.max_backoff_seconds = max_backoff_seconds - self.max_retries_total = max_retries_total - self.max_retries_status = max_retries_status - self.max_retries_read = max_retries_read - self.max_retries_connect = max_retries_connect + self._max_backoff_seconds = max_backoff_seconds + self._max_retries_total = max_retries_total + self._max_retries_status = max_retries_status + self._max_retries_read = max_retries_read + self._max_retries_connect = max_retries_connect + @property + def status_codes_to_retry(self) -> set[int]: + if self._status_codes_to_retry is None: + # Changes to the global config need to take effect immediately + return global_config.status_forcelist + return self._status_codes_to_retry + + @property + def max_backoff_seconds(self) -> int: + if self._max_backoff_seconds is None: + return global_config.max_retry_backoff + return self._max_backoff_seconds + + @property + def max_retries_total(self) -> int: + if self._max_retries_total is None: + return global_config.max_retries + return self._max_retries_total + + @property + def max_retries_status(self) -> int: + if self._max_retries_status is None: + return global_config.max_retries + return self._max_retries_status + + @property + def max_retries_read(self) -> int: + if self._max_retries_read is None: + return global_config.max_retries + return self._max_retries_read + + @property + def max_retries_connect(self) -> int: + if self._max_retries_connect is None: + return global_config.max_retries_connect + return self._max_retries_connect -class _RetryTracker: - def __init__(self, config: HTTPClientConfig) -> None: + +class RetryTracker: + def __init__(self, config: HTTPClientWithRetryConfig) -> None: self.config = config - self.status = 0 - self.read = 0 - self.connect = 0 + self.status = self.read = self.connect = 0 + self.last_failed_reason = "" @property def total(self) -> int: return self.status + self.read + self.connect - def _max_backoff_and_jitter(self, t: int) -> int: - return int(min(t, self.config.max_backoff_seconds) * random.uniform(0, 1.0)) - - def get_backoff_time(self) -> int: - backoff_time = self.config.backoff_factor * (2**self.total) - backoff_time_adjusted = self._max_backoff_and_jitter(backoff_time) - return backoff_time_adjusted - - def should_retry(self, status_code: int | None, is_auto_retryable: bool = False) -> bool: - if self.total >= self.config.max_retries_total: - return False - if self.status > 0 and self.status >= self.config.max_retries_status: - return False - if self.read > 0 and self.read >= self.config.max_retries_read: - return False - if self.connect > 0 and self.connect >= self.config.max_retries_connect: - return False - if status_code and status_code not in self.config.status_codes_to_retry and not is_auto_retryable: - return False - return True - - -class HTTPClient: + def get_backoff_time(self) -> float: + backoff_time = self.config.backoff_factor * 2**self.total + return random.random() * min(backoff_time, self.config.max_backoff_seconds) + + def back_off(self, url: str) -> None: + backoff_time = self.get_backoff_time() + logger.debug( + f"Retrying failed request, attempt #{self.total}, backoff time: {backoff_time=:.4f} sec, " + f"reason: {self.last_failed_reason!r}, url: {url}" + ) + time.sleep(backoff_time) + + @property + def should_retry_total(self) -> bool: + return self.total < self.config.max_retries_total + + def should_retry_status_code(self, status_code: int, is_auto_retryable: bool) -> bool: + self._last_failed_reason = f"{status_code=}" + return ( + self.should_retry_total + and self.status < self.config.max_retries_status + and (is_auto_retryable or status_code in self.config.status_codes_to_retry) + ) + + def should_retry_connect_error(self, error: httpx.RequestError) -> bool: + self._last_failed_reason = type(error).__name__ + return self.should_retry_total and self.connect < self.config.max_retries_connect + + def should_retry_timeout(self, error: httpx.RequestError) -> bool: + self._last_failed_reason = type(error).__name__ + return self.should_retry_total and self.read < self.config.max_retries_read + + +class HTTPClientWithRetry: def __init__( self, - config: HTTPClientConfig, - session: requests.Session, - refresh_auth_header: Callable[[MutableMapping[str, Any]], None], - retry_tracker_factory: Callable[[HTTPClientConfig], _RetryTracker] = _RetryTracker, + config: HTTPClientWithRetryConfig, + refresh_auth_header: Callable[[MutableMapping[str, str]], None], + httpx_client: httpx.Client | None = None, ) -> None: - self.session = session self.config = config self.refresh_auth_header = refresh_auth_header - self.retry_tracker_factory = retry_tracker_factory # needed for tests + self.httpx_client = httpx_client or get_global_httpx_client() - def request( + def __call__( self, - method: str, + method: Literal["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"], + /, url: str, - data: str | bytes | Iterable[bytes] | SupportsRead | None = None, - headers: MutableMapping[str, Any] | None = None, + *, + content: str | bytes | Iterable[bytes] | AsyncIterable[bytes] | None = None, + data: Mapping[str, Any] | None = None, + json: Any = None, + params: Mapping[str, str] | None = None, + headers: MutableMapping[str, str] | None = None, + follow_redirects: bool = False, timeout: float | None = None, - params: dict[str, Any] | str | bytes | None = None, - stream: bool | None = None, - allow_redirects: bool = False, - ) -> requests.Response: - retry_tracker = self.retry_tracker_factory(self.config) + ) -> httpx.Response: + fn: Callable[..., httpx.Response] = functools.partial( + self.httpx_client.request, + method, + url, + content=content, + data=data, + json=json, + params=params, + headers=headers, + follow_redirects=follow_redirects, + timeout=timeout, + ) + return self._with_retry(fn, url=url, headers=headers) + + def stream( + self, + method: Literal["GET", "POST"], + /, + url: str, + *, + json: Any = None, + headers: MutableMapping[str, str] | None = None, + timeout: float | None = None, + ) -> Iterator[httpx.Response]: + fn: Callable[..., Iterator[httpx.Response]] = functools.partial( + self.httpx_client.stream, + method, + url, + content=content, + json=json, + headers=headers, + timeout=timeout, + ) + return self._with_retry(fn, url=url, headers=headers) + + def _with_retry( + self, + fn: Callable[..., _T], + *, + url: str, + headers: MutableMapping[str, str] | None, + is_auto_retryable: bool = False, + ) -> _T: + retry_tracker = RetryTracker(self.config) accepts_json = (headers or {}).get("accept") == "application/json" - is_auto_retryable = False while True: try: - res = self._do_request( - method=method, - url=url, - data=data, - headers=headers, - timeout=timeout, - params=params, - stream=stream, - allow_redirects=allow_redirects, - ) + res = fn() if accepts_json: # Cache .json() return value in order to avoid redecoding JSON if called multiple times - res.json = functools.lru_cache(maxsize=1)(res.json) # type: ignore[assignment] - try: - is_auto_retryable = res.json().get("error", {}).get("isAutoRetryable", False) - except Exception: - # if the response is not JSON or it doesn't conform to the api design guide, + res.json = functools.cache(res.json) # type: ignore[assignment] + return res.raise_for_status() + + except httpx.HTTPStatusError: # only raised from raise_for_status() -> status code is guaranteed + if accepts_json: + with suppress(JSONDecodeError, AttributeError): + # If the response is not JSON or it doesn't conform to the api design guide, # we assume it's not auto-retryable - pass + # TODO: Can we just check the header now? 'cdf-is-auto-retryable' + is_auto_retryable = res.json().get("error", {}).get("isAutoRetryable", False) retry_tracker.status += 1 - if not retry_tracker.should_retry(status_code=res.status_code, is_auto_retryable=is_auto_retryable): - return res + if not retry_tracker.should_retry_status_code(res.status_code, is_auto_retryable): + raise - except CogniteReadTimeout as e: - retry_tracker.read += 1 - if not retry_tracker.should_retry(status_code=None, is_auto_retryable=True): - raise e - except CogniteConnectionError as e: + except httpx.ConnectError as err: + retry_tracker.connect += 1 + if not retry_tracker.should_retry_connect_error(err): + raise CogniteConnectionRefused from err + + except (httpx.NetworkError, httpx.ConnectTimeout) as err: retry_tracker.connect += 1 - if not retry_tracker.should_retry(status_code=None, is_auto_retryable=True): - raise e + if not retry_tracker.should_retry_connect_error(err): + raise CogniteConnectionError from err + + except httpx.TimeoutException as err: + retry_tracker.read += 1 + if not retry_tracker.should_retry_timeout(err): + raise CogniteReadTimeout from err + except httpx.RequestError as err: + # We want to avoid raising a non-Cognite error (from the underlying library). httpx.RequestError is the + # base class for all exceptions that can be raised during a request, so we use it here as a fallback. + raise CogniteRequestError from err + + retry_tracker.back_off(url) # During a backoff loop, our credentials might expire, so we check and maybe refresh: - time.sleep(retry_tracker.get_backoff_time()) if headers is not None: - # TODO: Refactoring needed to make this "prettier" - self.refresh_auth_header(headers) - - def _do_request( - self, - method: str, - url: str, - data: str | bytes | Iterable[bytes] | SupportsRead | None = None, - headers: MutableMapping[str, Any] | None = None, - timeout: float | None = None, - params: dict[str, Any] | str | bytes | None = None, - stream: bool | None = None, - allow_redirects: bool = False, - ) -> requests.Response: - """requests/urllib3 adds 2 or 3 layers of exceptions on top of built-in networking exceptions. - - Sometimes the appropriate built-in networking exception is not in the context, sometimes the requests - exception is not in the context, so we need to check for the appropriate built-in exceptions, - urllib3 exceptions, and requests exceptions. - """ - try: - res = self.session.request( - method=method, - url=url, - data=data, - headers=headers, - timeout=timeout, - params=params, - stream=stream, - allow_redirects=allow_redirects, - ) - return res - except Exception as e: - if self._any_exception_in_context_isinstance( - e, (socket.timeout, urllib3.exceptions.ReadTimeoutError, requests.exceptions.ReadTimeout) - ): - raise CogniteReadTimeout from e - if self._any_exception_in_context_isinstance( - e, - ( - ConnectionError, - urllib3.exceptions.ConnectionError, - urllib3.exceptions.ConnectTimeoutError, - requests.exceptions.ConnectionError, - ), - ): - if self._any_exception_in_context_isinstance(e, ConnectionRefusedError): - raise CogniteConnectionRefused from e - raise CogniteConnectionError from e - raise e - - @classmethod - def _any_exception_in_context_isinstance( - cls, exc: BaseException, exc_types: tuple[type[BaseException], ...] | type[BaseException] - ) -> bool: - """requests does not use the "raise ... from ..." syntax, so we need to access the underlying exceptions using - the __context__ attribute. - """ - if isinstance(exc, exc_types): - return True - if exc.__context__ is None: - return False - return cls._any_exception_in_context_isinstance(exc.__context__, exc_types) + self.refresh_auth_header(headers) # TODO: Refactoring needed to make this "prettier" diff --git a/cognite/client/config.py b/cognite/client/config.py index c68f0f1e5..6ad63bd3a 100644 --- a/cognite/client/config.py +++ b/cognite/client/config.py @@ -32,6 +32,7 @@ class GlobalConfig: max_workers (int | None): Max number of workers to spawn when parallelizing API calls. Defaults to 5. silence_feature_preview_warnings (bool): Whether or not to silence warnings triggered by using alpha or beta features. Defaults to False. + allow_redirects (bool): Whether or not to allow redirects. Defaults to False. """ def __new__(cls) -> GlobalConfig: @@ -58,6 +59,7 @@ def __init__(self) -> None: self.proxies: dict[str, str] | None = {} self.max_workers: int = 5 self.silence_feature_preview_warnings: bool = False + self.allow_redirects: bool = False def apply_settings(self, settings: dict[str, Any] | str) -> None: """Apply settings to the global configuration object from a YAML/JSON string or dict. diff --git a/cognite/client/exceptions.py b/cognite/client/exceptions.py index a74197ab8..eabe1774d 100644 --- a/cognite/client/exceptions.py +++ b/cognite/client/exceptions.py @@ -8,6 +8,7 @@ from cognite.client._constants import _RUNNING_IN_BROWSER from cognite.client.utils import _json from cognite.client.utils._auxiliary import no_op +from cognite.client.utils._url import resolve_url if TYPE_CHECKING: from cognite.client._cognite_client import CogniteClient @@ -33,16 +34,18 @@ def __init__( def _attempt_to_get_projects(client: CogniteClient) -> list[str] | None: # To avoid an infinte loop, we can't just use client.iam.token.inspect(), but use http_client directly: api_client = client.iam.token - _, full_url = api_client._resolve_url("GET", "/api/v1/token/inspect") - headers = api_client._configure_headers("application/json", client._config.headers.copy()) # type: ignore [has-type] + _, full_url = resolve_url("GET", "/api/v1/token/inspect", api_client._api_version, api_client._config) + full_headers = api_client._configure_headers(additional_headers=None, api_subversion=api_client._api_version) try: - token_inspect = api_client._http_client.request("GET", url=full_url, headers=headers) + token_inspect = api_client._http_client_with_retry( + "GET", full_url, headers=full_headers, timeout=api_client._config.timeout + ) return sorted({proj["projectUrlName"] for proj in token_inspect.json()["projects"]}) except Exception: return None def __str__(self) -> str: - msg = f"You don't have access to the requested CDF project={self.project!r}" + msg = f"You don't have access to the requested CDF project={self.project!r} (the token may have expired)" if self.maybe_projects: msg += f". Did you intend to use one of: {self.maybe_projects}?" msg += f" | code: 401 | X-Request-ID: {self.x_request_id}" @@ -86,16 +89,20 @@ def __init__(self, errors: list[GraphQLErrorSpec]) -> None: self.errors = errors -class CogniteConnectionError(CogniteException): +class CogniteRequestError(CogniteException): + pass + + +class CogniteConnectionError(CogniteRequestError): pass class CogniteConnectionRefused(CogniteConnectionError): - def __str__(self) -> str: - return "Cognite API connection refused. Please try again later." + def __init__(self) -> None: + super().__init__("Cognite API connection refused. Please try again later.") -class CogniteReadTimeout(CogniteException): +class CogniteReadTimeout(CogniteRequestError): pass diff --git a/cognite/client/utils/_auxiliary.py b/cognite/client/utils/_auxiliary.py index 81e7101f6..d2d5d5722 100644 --- a/cognite/client/utils/_auxiliary.py +++ b/cognite/client/utils/_auxiliary.py @@ -2,7 +2,6 @@ import functools import math -import platform import warnings from collections.abc import Hashable, Iterable, Iterator, Sequence from typing import ( @@ -12,7 +11,8 @@ TypeVar, overload, ) -from urllib.parse import quote + +from requests import Response from cognite.client.utils import _json from cognite.client.utils._importing import local_import @@ -20,7 +20,6 @@ convert_all_keys_to_camel_case, convert_all_keys_to_snake_case, to_camel_case, - to_snake_case, ) from cognite.client.utils.useful_types import SequenceNotStr @@ -126,36 +125,6 @@ def handle_renamed_argument( return old_arg -def handle_deprecated_camel_case_argument(new_arg: T, old_arg_name: str, fn_name: str, kw_dct: dict[str, Any]) -> T: - new_arg_name = to_snake_case(old_arg_name) - return handle_renamed_argument(new_arg, new_arg_name, old_arg_name, fn_name, kw_dct) - - -def interpolate_and_url_encode(path: str, *args: Any) -> str: - return path.format(*[quote(str(arg), safe="") for arg in args]) - - -def get_current_sdk_version() -> str: - from cognite.client import __version__ - - return __version__ - - -@functools.lru_cache(maxsize=1) -def get_user_agent() -> str: - sdk_version = f"CognitePythonSDK/{get_current_sdk_version()}" - python_version = ( - f"{platform.python_implementation()}/{platform.python_version()} " - f"({platform.python_build()};{platform.python_compiler()})" - ) - os_version_info = [platform.release(), platform.machine(), platform.architecture()[0]] - os_version_info = [s for s in os_version_info if s] # Ignore empty strings - os_version_info_str = "-".join(os_version_info) - operating_system = f"{platform.system()}/{os_version_info_str}" - - return f"{sdk_version} {python_version} {operating_system}" - - @overload def split_into_n_parts(seq: list[T], *, n: int) -> Iterator[list[T]]: ... diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index cc81c77c3..ea124f36d 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -204,6 +204,7 @@ def get_thread_pool_executor(cls, max_workers: int) -> ThreadPoolExecutor: except NameError: # TPE has not been initialized executor = _THREAD_POOL_EXECUTOR_SINGLETON = ThreadPoolExecutor(max_workers) + # TODO: Warning if max_workers != executor._max_workers? return executor @classmethod diff --git a/cognite/client/utils/_json.py b/cognite/client/utils/_json.py index 7c6bf0c2f..1b1639124 100644 --- a/cognite/client/utils/_json.py +++ b/cognite/client/utils/_json.py @@ -85,3 +85,16 @@ def convert_nonfinite_float_to_str(value: float | str | None) -> float | str | N if math.isnan(value): # type: ignore [arg-type] return "NaN" raise + + +def dumps_no_nan_or_inf(obj: Any) -> str: + try: + return dumps(obj, allow_nan=False) + except ValueError as e: + # A lot of work to give a more human friendly error message when nans and infs are present: + msg = "Out of range float values are not JSON compliant" + if msg in str(e): # exc. might e.g. contain an extra ": nan", depending on build (_json.make_encoder) + raise ValueError(f"{msg}. Make sure your data does not contain NaN(s) or +/- Inf!").with_traceback( + e.__traceback__ + ) from None + raise diff --git a/cognite/client/utils/_pyodide_helpers.py b/cognite/client/utils/_pyodide_helpers.py index 95dd8e183..90d3814be 100644 --- a/cognite/client/utils/_pyodide_helpers.py +++ b/cognite/client/utils/_pyodide_helpers.py @@ -1,28 +1,28 @@ from __future__ import annotations import os -import warnings -from collections.abc import Callable, MutableMapping -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import cognite.client as cc # Do not import individual entities -from cognite.client._http_client import _RetryTracker + +# from cognite.client._http_client import RetryTracker from cognite.client.config import ClientConfig, global_config from cognite.client.credentials import CredentialProvider if TYPE_CHECKING: - from requests import Session + pass + - from cognite.client._http_client import HTTPClient, HTTPClientConfig +# TODO: Make pyodide work with httpx def patch_sdk_for_pyodide() -> None: # ------------------- # Patch Pyodide related issues # - Patch 'requests' as it does not work in pyodide (socket not implemented): - from pyodide_http import patch_all + # from pyodide_http import patch_all - patch_all() + # patch_all() # ----------------- # Patch Cognite SDK @@ -33,8 +33,8 @@ def patch_sdk_for_pyodide() -> None: global_config.disable_gzip = True # - Use another HTTP adapter: - cc._http_client.HTTPClient._old__init__ = cc._http_client.HTTPClient.__init__ # type: ignore [attr-defined] - cc._http_client.HTTPClient.__init__ = http_client__init__ # type: ignore [method-assign] + # cc._http_client.HTTPClient._old__init__ = cc._http_client.HTTPClient.__init__ # type: ignore [attr-defined] + # cc._http_client.HTTPClient.__init__ = http_client__init__ # type: ignore [method-assign] # - Inject these magic classes into the correct modules so that the user may import them normally: cc.config.FusionNotebookConfig = FusionNotebookConfig # type: ignore [attr-defined] @@ -42,13 +42,6 @@ def patch_sdk_for_pyodide() -> None: # - Set all usage of thread pool executors to use dummy/serial-implementations: cc.utils._concurrency.ConcurrencySettings.executor_type = "mainthread" - # - Auto-ignore protobuf warning for the user (as they can't fix this): - warnings.filterwarnings( - action="ignore", - category=UserWarning, - message="Your installation of 'protobuf' is missing compiled C binaries", - ) - # - If we are running inside of a JupyterLite Notebook spawned from Cognite Data Fusion, we set # the default config to FusionNotebookConfig(). This allows the user to: # >>> from cognite.client import CogniteClient @@ -57,18 +50,18 @@ def patch_sdk_for_pyodide() -> None: global_config.default_client_config = FusionNotebookConfig() -def http_client__init__( - self: HTTPClient, - config: HTTPClientConfig, - session: Session, - refresh_auth_header: Callable[[MutableMapping[str, Any]], None], - retry_tracker_factory: Callable[[HTTPClientConfig], _RetryTracker] = _RetryTracker, -) -> None: - import pyodide_http +# def http_client__init__( +# self: HTTPClientWithRetry, +# config: HTTPClientWithRetryConfig, +# session: Session, +# refresh_auth_header: Callable[[MutableMapping[str, Any]], None], +# retry_tracker_factory: Callable[[HTTPClientWithRetryConfig], RetryTracker] = RetryTracker, +# ) -> None: +# import pyodide_http - self._old__init__(config, session, refresh_auth_header, retry_tracker_factory) # type: ignore [attr-defined] - self.session.mount("https://", pyodide_http._requests.PyodideHTTPAdapter()) - self.session.mount("http://", pyodide_http._requests.PyodideHTTPAdapter()) +# self._old__init__(config, session, refresh_auth_header, retry_tracker_factory) # type: ignore [attr-defined] +# self.session.mount("https://", pyodide_http._requests.PyodideHTTPAdapter()) +# self.session.mount("http://", pyodide_http._requests.PyodideHTTPAdapter()) class EnvVarToken(CredentialProvider): diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 75b1c988a..714ca245f 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -1,7 +1,12 @@ +from __future__ import annotations + import re -from urllib.parse import urljoin +from typing import TYPE_CHECKING, Any +from urllib.parse import quote, urljoin + +if TYPE_CHECKING: + from cognite.client.config import ClientConfig -from cognite.client.config import ClientConfig RETRYABLE_POST_ENDPOINT_REGEX_PATTERN: re.Pattern[str] = re.compile( "|".join( @@ -68,3 +73,7 @@ def can_be_retried(method: str, path: str) -> bool: return True case _: return False + + +def interpolate_and_url_encode(path: str, *args: Any) -> str: + return path.format(*[quote(str(arg), safe="") for arg in args]) diff --git a/cognite/client/utils/_version_checker.py b/cognite/client/utils/_version_checker.py index 71b10b419..1552a3ee0 100644 --- a/cognite/client/utils/_version_checker.py +++ b/cognite/client/utils/_version_checker.py @@ -6,7 +6,7 @@ from contextlib import suppress from threading import Thread -import requests +import httpx from packaging import version @@ -14,7 +14,7 @@ def get_all_sdk_versions() -> list[version.Version]: from cognite.client.config import global_config verify_ssl = not global_config.disable_ssl - res = requests.get("https://pypi.org/simple/cognite-sdk/", verify=verify_ssl, timeout=5) + res = httpx.get("https://pypi.org/simple/cognite-sdk/", verify=verify_ssl, timeout=5) return list(map(version.parse, re.findall(r"cognite[_-]sdk-(\d+\.\d+.[\dabrc]+)", res.text))) diff --git a/tests/tests_unit/test_http_client.py b/tests/tests_unit/test_http_client.py index f161db367..c4e9ec3ac 100644 --- a/tests/tests_unit/test_http_client.py +++ b/tests/tests_unit/test_http_client.py @@ -4,10 +4,10 @@ import requests.exceptions import urllib3.exceptions -from cognite.client._http_client import HTTPClient, HTTPClientConfig, _RetryTracker +from cognite.client._http_client import HTTPClientWithRetry, HTTPClientWithRetryConfig, RetryTracker from cognite.client.exceptions import CogniteConnectionError, CogniteConnectionRefused, CogniteReadTimeout -DEFAULT_CONFIG = HTTPClientConfig( +DEFAULT_CONFIG = HTTPClientWithRetryConfig( status_codes_to_retry={429}, backoff_factor=0.5, max_backoff_seconds=30, @@ -20,7 +20,7 @@ class TestRetryTracker: def test_total_retries_exceeded(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) rt.config.max_retries_total = 10 rt.status = 4 rt.connect = 4 @@ -30,40 +30,40 @@ def test_total_retries_exceeded(self): assert rt.should_retry(200) is False def test_status_retries_exceeded(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) rt.config.max_retries_status = 1 assert rt.should_retry(None) is True rt.status = 1 assert rt.should_retry(None) is False def test_read_retries_exceeded(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) rt.config.max_retries_read = 1 assert rt.should_retry(None) is True rt.read = 1 assert rt.should_retry(None) is False def test_connect_retries_exceeded(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) rt.config.max_retries_connect = 1 assert rt.should_retry(None) is True rt.connect = 1 assert rt.should_retry(None) is False def test_correct_retry_of_status(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) assert rt.should_retry(500) is False rt.config.status_codes_to_retry = {500, 429} assert rt.should_retry(500) is True def test_get_backoff_time(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) for i in range(1000): rt.read += 1 assert 0 <= rt.get_backoff_time() <= DEFAULT_CONFIG.max_backoff_seconds def test_is_auto_retryable(self): - rt = _RetryTracker(config=DEFAULT_CONFIG) + rt = RetryTracker(config=DEFAULT_CONFIG) rt.config.max_retries_status = 1 # 409 is not in the list of status codes to retry, but we set is_auto_retryable=True, which should override it @@ -85,8 +85,8 @@ class TestHTTPClient: def test_read_timeout_errors(self): cnf = DEFAULT_CONFIG cnf.max_backoff_seconds = 0 - retry_tracker = _RetryTracker(cnf) - c = HTTPClient( + retry_tracker = RetryTracker(cnf) + c = HTTPClientWithRetry( config=cnf, refresh_auth_header=lambda headers: None, retry_tracker_factory=lambda _: retry_tracker, @@ -109,8 +109,8 @@ def test_read_timeout_errors(self): def test_connect_errors(self, exc_type): cnf = DEFAULT_CONFIG cnf.max_backoff_seconds = 0 - retry_tracker = _RetryTracker(cnf) - c = HTTPClient( + retry_tracker = RetryTracker(cnf) + c = HTTPClientWithRetry( config=cnf, refresh_auth_header=lambda headers: None, retry_tracker_factory=lambda _: retry_tracker, @@ -132,8 +132,8 @@ def test_connect_errors(self, exc_type): def test_connection_refused_retried(self): cnf = DEFAULT_CONFIG cnf.max_backoff_seconds = 0 - retry_tracker = _RetryTracker(cnf) - c = HTTPClient( + retry_tracker = RetryTracker(cnf) + c = HTTPClientWithRetry( config=cnf, refresh_auth_header=lambda headers: None, retry_tracker_factory=lambda _: retry_tracker, @@ -154,8 +154,8 @@ def test_connection_refused_retried(self): def test_status_errors(self): cnf = DEFAULT_CONFIG cnf.max_backoff_seconds = 0 - retry_tracker = _RetryTracker(cnf) - c = HTTPClient( + retry_tracker = RetryTracker(cnf) + c = HTTPClientWithRetry( config=cnf, refresh_auth_header=lambda headers: None, retry_tracker_factory=lambda _: retry_tracker, @@ -180,13 +180,13 @@ def test_get_underlying_exception_does_not_exist_in_context(self): try: raise Exception except Exception as e: - assert not HTTPClient._any_exception_in_context_isinstance(e, UnderlyingException) + assert not HTTPClientWithRetry._any_exception_in_context_isinstance(e, UnderlyingException) def test_get_underlying_exception_no_context(self): try: raise UnderlyingException except Exception as e: - assert HTTPClient._any_exception_in_context_isinstance(e, (UnderlyingException, Exception)) + assert HTTPClientWithRetry._any_exception_in_context_isinstance(e, (UnderlyingException, Exception)) def test_get_underlying_exception_nested_1_layer(self): def testcase(): @@ -198,7 +198,7 @@ def testcase(): try: testcase() except Exception as e: - assert HTTPClient._any_exception_in_context_isinstance(e, UnderlyingException) + assert HTTPClientWithRetry._any_exception_in_context_isinstance(e, UnderlyingException) def test_get_underlying_exception_nested_2_layers(self): def testcase(): @@ -213,4 +213,4 @@ def testcase(): try: testcase() except Exception as e: - assert HTTPClient._any_exception_in_context_isinstance(e, UnderlyingException) + assert HTTPClientWithRetry._any_exception_in_context_isinstance(e, UnderlyingException) From 0d5eb3b661dd027fe39df79a8c4b3e7932c3e986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Tue, 14 Jan 2025 00:03:00 +0100 Subject: [PATCH 7/7] EVERYTHING (yes) --- cognite/client/_api/diagrams.py | 4 +- cognite/client/_api/documents.py | 34 +++++++-------- cognite/client/_api/files.py | 35 ++++++++-------- cognite/client/_api/geospatial.py | 42 ++++++------------- cognite/client/_api/sequences.py | 6 +-- cognite/client/_api_client.py | 2 +- cognite/client/_basic_api_client.py | 21 ++++++---- cognite/client/_http_client.py | 38 +++++++---------- cognite/client/config.py | 2 + .../client/data_classes/contextualization.py | 5 +-- cognite/client/data_classes/files.py | 6 +-- 11 files changed, 87 insertions(+), 108 deletions(-) diff --git a/cognite/client/_api/diagrams.py b/cognite/client/_api/diagrams.py index d01441741..d337961fe 100644 --- a/cognite/client/_api/diagrams.py +++ b/cognite/client/_api/diagrams.py @@ -4,8 +4,6 @@ from math import ceil from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast, overload -from requests import Response - from cognite.client._api_client import APIClient from cognite.client.data_classes._base import CogniteResource from cognite.client.data_classes.contextualization import ( @@ -23,6 +21,8 @@ from cognite.client.utils.useful_types import SequenceNotStr if TYPE_CHECKING: + from httpx import Response + from cognite.client import CogniteClient from cognite.client.config import ClientConfig diff --git a/cognite/client/_api/documents.py b/cognite/client/_api/documents.py index 506cb4b5f..1cf674e8e 100644 --- a/cognite/client/_api/documents.py +++ b/cognite/client/_api/documents.py @@ -19,6 +19,7 @@ TemporaryLink, ) from cognite.client.data_classes.filters import _BASIC_FILTERS, Filter, _validate_filter +from cognite.client.utils._url import resolve_url if TYPE_CHECKING: from cognite.client import ClientConfig, CogniteClient @@ -55,10 +56,9 @@ def download_page_as_png_bytes(self, id: int, page_number: int = 1) -> bytes: >>> binary_png = client.documents.previews.download_page_as_png_bytes(id=123, page_number=5) >>> Image(binary_png) """ - res = self._do_request( - "GET", f"{self._RESOURCE_PATH}/{id}/preview/image/pages/{page_number}", accept="image/png" - ) - return res.content + return self._get( + f"{self._RESOURCE_PATH}/{id}/preview/image/pages/{page_number}", headers={"accept": "image/png"} + ).content def download_page_as_png( self, path: Path | str | IO, id: int, page_number: int = 1, overwrite: bool = False @@ -112,8 +112,7 @@ def download_document_as_pdf_bytes(self, id: int) -> bytes: >>> client = CogniteClient() >>> content = client.documents.previews.download_document_as_pdf_bytes(id=123) """ - res = self._do_request("GET", f"{self._RESOURCE_PATH}/{id}/preview/pdf", accept="application/pdf") - return res.content + return self._get(f"{self._RESOURCE_PATH}/{id}/preview/pdf", headers={"accept": "application/pdf"}).content def download_document_as_pdf(self, path: Path | str | IO, id: int, overwrite: bool = False) -> None: """`Downloads a pdf preview of the specified document. `_ @@ -134,6 +133,7 @@ def download_document_as_pdf(self, path: Path | str | IO, id: int, overwrite: bo >>> client.documents.previews.download_document_as_pdf("previews", id=123) """ if isinstance(path, IO): + # TODO(doctrino): This seems impossible to trigger content = self.download_document_as_pdf_bytes(id) path.write(content) return @@ -476,7 +476,6 @@ def retrieve_content(self, id: int) -> bytes: in order to reduce the size of the returned payload. If you want the whole text for a document, you can use this endpoint. - Args: id (int): The server-generated ID for the document you want to retrieve the content of. @@ -491,10 +490,7 @@ def retrieve_content(self, id: int) -> bytes: >>> client = CogniteClient() >>> content = client.documents.retrieve_content(id=123) """ - - body = {"id": id} - response = self._do_request("POST", f"{self._RESOURCE_PATH}/content", accept="text/plain", json=body) - return response.content + return self._post(f"{self._RESOURCE_PATH}/content", headers={"accept": "text/plain"}, json={"id": id}).content def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None: """`Retrieve document content into buffer `_ @@ -520,13 +516,15 @@ def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None: >>> with Path("my_file.txt").open("wb") as buffer: ... client.documents.retrieve_content_buffer(id=123, buffer=buffer) """ - # TODO: Needs an httpx checkup, stream, iter_content, etc. - with self._do_request( - "GET", f"{self._RESOURCE_PATH}/{id}/content", stream=True, accept="text/plain" - ) as response: - for chunk in response.iter_content(chunk_size=2**21): - if chunk: # filter out keep-alive new chunks - buffer.write(chunk) + from cognite.client import global_config + + _, full_url = resolve_url("GET", f"{self._RESOURCE_PATH}/{id}/content", self._api_version, self._config) + stream = self._stream( + "GET", full_url=full_url, headers={"accept": "text/plain"}, timeout=self._config.file_transfer_timeout + ) + with stream as resp: + for chunk in resp.iter_bytes(chunk_size=global_config.file_download_chunk_size): + buffer.write(chunk) @overload def search( diff --git a/cognite/client/_api/files.py b/cognite/client/_api/files.py index 00698abbb..493d090cc 100644 --- a/cognite/client/_api/files.py +++ b/cognite/client/_api/files.py @@ -7,7 +7,7 @@ from collections.abc import Iterator, Sequence from io import BufferedReader from pathlib import Path -from typing import Any, BinaryIO, Literal, TextIO, cast, overload +from typing import Any, BinaryIO, Literal, cast, overload from urllib.parse import urljoin, urlparse from cognite.client._api_client import APIClient @@ -641,7 +641,8 @@ def _upload_bytes(self, content: bytes | BinaryIO, returned_file_metadata: dict) else: full_upload_url = urljoin(self._config.base_url, upload_url) file_metadata = FileMetadata._load(returned_file_metadata) - upload_response = self._put( + upload_response = self._request( + "PUT", full_upload_url, content=content, headers={"Content-Type": file_metadata.mime_type, "accept": "*/*"}, @@ -884,26 +885,26 @@ def multipart_upload_content_session( FileMetadata._load(returned_file_metadata), upload_urls, upload_id, self._cognite_client ) - def _upload_multipart_part(self, upload_url: str, content: str | bytes | TextIO | BinaryIO) -> None: + def _upload_multipart_part(self, upload_url: str, content: str | bytes | BinaryIO) -> None: """Upload part of a file to an upload URL returned from `multipart_upload_session`. Note that if `content` does not somehow expose its length, this method may not work on Azure. See `requests.utils.super_len`. Args: upload_url (str): URL to upload file chunk to. - content (str | bytes | TextIO | BinaryIO): The content to upload. + content (str | bytes | BinaryIO): The content to upload. """ if isinstance(content, str): content = content.encode("utf-8") - upload_response = self._http_client_with_retry.request( + upload_response = self._request( "PUT", - upload_url, - data=content, - timeout=self._config.file_transfer_timeout, + full_url=upload_url, + content=content, headers={"accept": "*/*"}, + timeout=self._config.file_transfer_timeout, ) - if not upload_response.ok: + if not upload_response.is_success: raise CogniteFileUploadError(message=upload_response.text, code=upload_response.status_code) def _complete_multipart_upload(self, session: FileMultipartUploadSession) -> None: @@ -1123,15 +1124,15 @@ def _process_file_download( download_link = self._get_download_link(identifier) self._download_file_to_path(download_link, file_path_absolute) - def _download_file_to_path(self, download_link: str, path: Path, chunk_size: int = 2**21) -> None: - # TODO: Needs an httpx checkup, stream, iter_content, etc. - with self._stream( + def _download_file_to_path(self, download_link: str, path: Path) -> None: + from cognite.client import global_config + + stream = self._stream( "GET", download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout - ) as r: - with path.open("wb") as f: - for chunk in r.iter_content(chunk_size=chunk_size): - if chunk: # filter out keep-alive new chunks - f.write(chunk) + ) + with stream as r, path.open("wb") as f: + for chunk in r.iter_bytes(chunk_size=global_config.file_download_chunk_size): + f.write(chunk) def download_to_path( self, path: Path | str, id: int | None = None, external_id: str | None = None, instance_id: NodeId | None = None diff --git a/cognite/client/_api/geospatial.py b/cognite/client/_api/geospatial.py index 1385c7da9..027897aeb 100644 --- a/cognite/client/_api/geospatial.py +++ b/cognite/client/_api/geospatial.py @@ -30,6 +30,7 @@ from cognite.client.exceptions import CogniteConnectionError from cognite.client.utils import _json from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils._url import resolve_url from cognite.client.utils.useful_types import SequenceNotStr @@ -708,14 +709,13 @@ def stream_features( "allowCrsTransformation": allow_crs_transformation, "allowDimensionalityMismatch": allow_dimensionality_mismatch, } - # TODO: TODO: Needs an httpx checkup due to stream=True - res = self._do_request("POST", url_path=resource_path, json=payload, timeout=self._config.timeout, stream=True) - - try: - for line in res.iter_lines(): - yield Feature._load(_json.loads(line)) - except (ChunkedEncodingError, ConnectionError) as e: - raise CogniteConnectionError(e) + _, full_url = resolve_url("POST", resource_path, self._api_version, self._config) + with self._stream("POST", full_url=full_url, json=payload) as resp: + try: + for line in resp.iter_lines(): + yield Feature._load(_json.loads(line)) + except (ChunkedEncodingError, ConnectionError) as e: + raise CogniteConnectionError(e) def aggregate_features( self, @@ -969,13 +969,7 @@ def put_raster( ) with open(file, "rb") as fh: data = fh.read() - res = self._do_request( - "PUT", - url_path, - data=data, - headers={"Content-Type": "application/binary"}, - timeout=self._config.timeout, - ) + res = self._put(url_path, content=data, headers={"Content-Type": "application/binary"}) return RasterMetadata.load(res.json(), cognite_client=self._cognite_client) def delete_raster( @@ -1002,14 +996,9 @@ def delete_raster( >>> raster_property_name = ... >>> client.geospatial.delete_raster(feature_type.external_id, feature.external_id, raster_property_name) """ - url_path = ( + self._post( self._raster_resource_path(feature_type_external_id, feature_external_id, raster_property_name) + "/delete" ) - self._do_request( - "POST", - url_path, - timeout=self._config.timeout, - ) def get_raster( self, @@ -1052,10 +1041,8 @@ def get_raster( ... raster_property_name, "XYZ", {"SIGNIFICANT_DIGITS": "4"}) """ url_path = self._raster_resource_path(feature_type_external_id, feature_external_id, raster_property_name) - res = self._do_request( - "POST", + return self._post( url_path, - timeout=self._config.timeout, json={ "format": raster_format, "options": raster_options, @@ -1064,8 +1051,7 @@ def get_raster( "scaleX": raster_scale_x, "scaleY": raster_scale_y, }, - ) - return res.content + ).content def compute( self, @@ -1089,10 +1075,8 @@ def compute( >>> compute_function = GeospatialGeometryTransformComputeFunction(GeospatialGeometryValueComputeFunction("SRID=4326;POLYGON((0 0,10 0,10 10,0 10,0 0))"), srid=23031) >>> compute_result = client.geospatial.compute(output = {"output": compute_function}) """ - res = self._do_request( - "POST", + res = self._post( f"{GeospatialAPI._RESOURCE_PATH}/compute", - timeout=self._config.timeout, json={"output": {k: v.to_json_payload() for k, v in output.items()}}, ) return GeospatialComputedResponse._load(res.json(), cognite_client=self._cognite_client) diff --git a/cognite/client/_api/sequences.py b/cognite/client/_api/sequences.py index 6ebb56130..d1ba29210 100644 --- a/cognite/client/_api/sequences.py +++ b/cognite/client/_api/sequences.py @@ -1324,10 +1324,8 @@ def retrieve_last_row( """ columns = handle_renamed_argument(columns, "columns", "column_external_ids", "insert", kwargs, False) identifier = Identifier.of_either(id, external_id).as_dict() - res = self._do_request( - "POST", self._DATA_PATH + "/latest", json={**identifier, "before": before, "columns": columns} - ).json() - return SequenceRows._load(res) + res = self._post(self._DATA_PATH + "/latest", json={**identifier, "before": before, "columns": columns}) + return SequenceRows._load(res.json()) def retrieve_dataframe( self, diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index 06155638f..e930aa927 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -865,7 +865,7 @@ def _upsert_multiple( ) except CogniteNotFoundError as not_found_error: items_by_external_id = {item.external_id: item for item in items if item.external_id is not None} # type: ignore [attr-defined] - items_by_id = {item.id: item for item in items if getattr(item, "id", None) is not None} + items_by_id = {item.id: item for item in items if hasattr(item, "id") and item.id is not None} # Not found must have an external id as they do not exist in CDF: try: missing_external_ids = {entry["externalId"] for entry in not_found_error.not_found} diff --git a/cognite/client/_basic_api_client.py b/cognite/client/_basic_api_client.py index bfccf1170..b5f37f6da 100644 --- a/cognite/client/_basic_api_client.py +++ b/cognite/client/_basic_api_client.py @@ -5,6 +5,7 @@ import logging import platform from collections.abc import Iterable, Iterator, MutableMapping +from contextlib import contextmanager from typing import TYPE_CHECKING, Any, Literal, NoReturn import httpx @@ -85,40 +86,42 @@ def select_http_client(self, is_retryable: bool) -> HTTPClientWithRetry: def _request( self, - method: str, + method: Literal["GET", "PUT", "HEAD"], /, full_url: str, + content: str | bytes | Iterable[bytes] | None = None, headers: dict[str, Any] | None = None, timeout: float | None = None, ) -> httpx.Response: """Make a request to something that is outside Cognite Data Fusion""" client = self.select_http_client(method in {"GET", "PUT", "HEAD"}) try: - res = client("GET", full_url, headers=headers, timeout=timeout or self._config.timeout) + res = client(method, full_url, content=content, headers=headers, timeout=timeout or self._config.timeout) except httpx.HTTPStatusError as err: self._handle_status_error(err) self._log_successful_request(res) return res + @contextmanager def _stream( self, method: Literal["GET", "POST"], /, full_url: str, + json: Any = None, headers: dict[str, Any] | None = None, timeout: float | None = None, ) -> Iterator[httpx.Response]: try: - res = self._http_client_with_retry.stream( - method, full_url, headers=headers, timeout=timeout or self._config.timeout - ) + with self._http_client_with_retry.stream( + method, full_url, json=json, headers=headers, timeout=timeout or self._config.timeout + ) as resp: + self._log_successful_request(resp, stream=True) + yield resp except httpx.HTTPStatusError as err: self._handle_status_error(err) - self._log_successful_request(res, stream=True) - return res - def _get( self, url_path: str, @@ -186,7 +189,7 @@ def _put( api_subversion: str | None = None, timeout: float | None = None, ) -> httpx.Response: - is_retryable, full_url = resolve_url("PUT", url_path, self._api_version, self._config) + _, full_url = resolve_url("PUT", url_path, self._api_version, self._config) full_headers = self._configure_headers(additional_headers=headers, api_subversion=api_subversion) if content is None: content = handle_json_dump(json, full_headers) diff --git a/cognite/client/_http_client.py b/cognite/client/_http_client.py index 319cdc66d..80bc72669 100644 --- a/cognite/client/_http_client.py +++ b/cognite/client/_http_client.py @@ -3,15 +3,11 @@ import functools import random import time -from collections.abc import AsyncIterable, Callable, Iterable, Iterator, Mapping, MutableMapping -from contextlib import suppress +from collections.abc import AsyncIterable, Callable, Iterable, Mapping, MutableMapping +from contextlib import AbstractContextManager, suppress from http.cookiejar import Cookie, CookieJar from json import JSONDecodeError -from typing import ( - Any, - Literal, - TypeVar, -) +from typing import Any, Literal, TypeVar from venv import logger import httpx @@ -24,7 +20,7 @@ CogniteRequestError, ) -_T = TypeVar("_T") +_T = TypeVar("_T", httpx.Response, AbstractContextManager[httpx.Response]) class NoCookiesPlease(CookieJar): @@ -43,7 +39,8 @@ def get_global_httpx_client() -> httpx.Client: follow_redirects=global_config.allow_redirects, cookies=NoCookiesPlease(), verify=not global_config.disable_ssl, - proxies=global_config.proxies, # TODO: httpx has deprecated 'proxies' + # TODO: httpx has deprecated 'proxies' + proxies=global_config.proxies, # type: ignore ) if global_config.disable_ssl: # TODO: httpx uses httpcore, not urllib3 -> figure out how to disable warnings (if any?) @@ -199,17 +196,11 @@ def stream( json: Any = None, headers: MutableMapping[str, str] | None = None, timeout: float | None = None, - ) -> Iterator[httpx.Response]: - fn: Callable[..., Iterator[httpx.Response]] = functools.partial( - self.httpx_client.stream, - method, - url, - content=content, - json=json, - headers=headers, - timeout=timeout, + ) -> AbstractContextManager[httpx.Response]: + fn: Callable[..., AbstractContextManager[httpx.Response]] = functools.partial( + self.httpx_client.stream, method, url, json=json, headers=headers, timeout=timeout ) - return self._with_retry(fn, url=url, headers=headers) + return self._with_retry(fn, url=url, headers=headers, stream=True) def _with_retry( self, @@ -218,16 +209,19 @@ def _with_retry( url: str, headers: MutableMapping[str, str] | None, is_auto_retryable: bool = False, + stream: bool = False, ) -> _T: retry_tracker = RetryTracker(self.config) accepts_json = (headers or {}).get("accept") == "application/json" while True: try: - res = fn() + if stream: + return fn() + res: httpx.Response = fn() # type: ignore if accepts_json: # Cache .json() return value in order to avoid redecoding JSON if called multiple times - res.json = functools.cache(res.json) # type: ignore[assignment] - return res.raise_for_status() + res.json = functools.cache(res.json) # type: ignore [assignment] + return res.raise_for_status() # type: ignore [return-value] except httpx.HTTPStatusError: # only raised from raise_for_status() -> status code is guaranteed if accepts_json: diff --git a/cognite/client/config.py b/cognite/client/config.py index 6ad63bd3a..1df038635 100644 --- a/cognite/client/config.py +++ b/cognite/client/config.py @@ -30,6 +30,7 @@ class GlobalConfig: disable_ssl (bool): Whether or not to disable SSL. Defaults to False proxies (Dict[str, str]): Dictionary mapping from protocol to url. e.g. {"https": "http://10.10.1.10:1080"} max_workers (int | None): Max number of workers to spawn when parallelizing API calls. Defaults to 5. + file_download_chunk_size (int | None): Override the chunk size for streaming file downloads. Defaults to None (auto). silence_feature_preview_warnings (bool): Whether or not to silence warnings triggered by using alpha or beta features. Defaults to False. allow_redirects (bool): Whether or not to allow redirects. Defaults to False. @@ -58,6 +59,7 @@ def __init__(self) -> None: self.disable_ssl: bool = False self.proxies: dict[str, str] | None = {} self.max_workers: int = 5 + self.file_download_chunk_size: int | None = None self.silence_feature_preview_warnings: bool = False self.allow_redirects: bool = False diff --git a/cognite/client/data_classes/contextualization.py b/cognite/client/data_classes/contextualization.py index 71de0c25f..2027b8fae 100644 --- a/cognite/client/data_classes/contextualization.py +++ b/cognite/client/data_classes/contextualization.py @@ -2,12 +2,11 @@ import time import warnings -from collections.abc import Sequence +from collections.abc import MutableMapping, Sequence from dataclasses import dataclass from enum import Enum from typing import TYPE_CHECKING, Any, TypeVar, cast -from requests.utils import CaseInsensitiveDict from typing_extensions import Self from cognite.client.data_classes import Annotation @@ -164,7 +163,7 @@ def _load_with_status( cls: type[T_ContextualizationJob], *, data: dict[str, Any], - headers: CaseInsensitiveDict[str], + headers: MutableMapping[str, str], status_path: str, cognite_client: Any, ) -> T_ContextualizationJob: diff --git a/cognite/client/data_classes/files.py b/cognite/client/data_classes/files.py index 2565f513c..1e614dfa3 100644 --- a/cognite/client/data_classes/files.py +++ b/cognite/client/data_classes/files.py @@ -3,7 +3,7 @@ from abc import ABC from collections.abc import Sequence from types import TracebackType -from typing import TYPE_CHECKING, Any, BinaryIO, Literal, TextIO, TypeVar, cast +from typing import TYPE_CHECKING, Any, BinaryIO, Literal, TypeVar, cast from cognite.client.data_classes._base import ( CogniteFilter, @@ -544,14 +544,14 @@ def __init__( self._in_context = False self._cognite_client = cognite_client - def upload_part(self, part_no: int, content: str | bytes | TextIO | BinaryIO) -> None: + def upload_part(self, part_no: int, content: str | bytes | BinaryIO) -> None: """Upload part of a file. Note that if `content` does not somehow expose its length, this method may not work on Azure. See `requests.utils.super_len`. Args: part_no (int): Which part number this is, must be between 0 and `parts` given to `multipart_upload_session` - content (str | bytes | TextIO | BinaryIO): The content to upload. + content (str | bytes | BinaryIO): The content to upload. """ if part_no < 0 or part_no > len(self._uploaded_urls): raise ValueError(f"Index out of range: {part_no}, must be between 0 and {len(self._uploaded_urls)}")