Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Swap requests out with httpx #2089

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cognite/client/_api/data_modeling/graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from cognite.client.data_classes.data_modeling.graphql import DMLApplyResult
from cognite.client.data_classes.data_modeling.ids import DataModelId
from cognite.client.exceptions import CogniteGraphQLError, GraphQLErrorSpec
from cognite.client.utils._auxiliary import interpolate_and_url_encode
from cognite.client.utils._url import interpolate_and_url_encode


class DataModelingGraphQLAPI(APIClient):
Expand Down
13 changes: 6 additions & 7 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
is_positive,
split_into_chunks,
split_into_n_parts,
unpack_items,
unpack_items_in_payload,
)
from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks
Expand Down Expand Up @@ -118,12 +119,10 @@ def fetch_all_datapoints_numpy(self) -> DatapointsArrayList:

def _request_datapoints(self, payload: _DatapointsPayload) -> Sequence[DataPointListItem]:
(res := DataPointListResponse()).MergeFromString(
self.dps_client._do_request(
json=payload,
method="POST",
url_path=f"{self.dps_client._RESOURCE_PATH}/list",
accept="application/protobuf",
timeout=self.dps_client._config.timeout,
self.dps_client._post(
f"{self.dps_client._RESOURCE_PATH}/list",
json=payload, # type: ignore [arg-type]
headers={"accept": "application/protobuf"},
).content
)
return res.items
Expand Down Expand Up @@ -2143,5 +2142,5 @@ def fetch_datapoints(self) -> list[dict[str, Any]]:
task_unwrap_fn=unpack_items_in_payload,
task_list_element_unwrap_fn=IdentifierSequenceCore.extract_identifiers,
)
result = tasks_summary.joined_results(lambda res: res.json()["items"])
result = tasks_summary.joined_results(unpack_items)
return self._post_fix_status_codes_and_stringified_floats(result)
4 changes: 2 additions & 2 deletions cognite/client/_api/diagrams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from math import ceil
from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast, overload

from requests import Response

from cognite.client._api_client import APIClient
from cognite.client.data_classes._base import CogniteResource
from cognite.client.data_classes.contextualization import (
Expand All @@ -23,6 +21,8 @@
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from httpx import Response

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig

Expand Down
34 changes: 16 additions & 18 deletions cognite/client/_api/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TemporaryLink,
)
from cognite.client.data_classes.filters import _BASIC_FILTERS, Filter, _validate_filter
from cognite.client.utils._url import resolve_url

if TYPE_CHECKING:
from cognite.client import ClientConfig, CogniteClient
Expand Down Expand Up @@ -55,10 +56,9 @@ def download_page_as_png_bytes(self, id: int, page_number: int = 1) -> bytes:
>>> binary_png = client.documents.previews.download_page_as_png_bytes(id=123, page_number=5)
>>> Image(binary_png)
"""
res = self._do_request(
"GET", f"{self._RESOURCE_PATH}/{id}/preview/image/pages/{page_number}", accept="image/png"
)
return res.content
return self._get(
f"{self._RESOURCE_PATH}/{id}/preview/image/pages/{page_number}", headers={"accept": "image/png"}
).content

def download_page_as_png(
self, path: Path | str | IO, id: int, page_number: int = 1, overwrite: bool = False
Expand Down Expand Up @@ -112,8 +112,7 @@ def download_document_as_pdf_bytes(self, id: int) -> bytes:
>>> client = CogniteClient()
>>> content = client.documents.previews.download_document_as_pdf_bytes(id=123)
"""
res = self._do_request("GET", f"{self._RESOURCE_PATH}/{id}/preview/pdf", accept="application/pdf")
return res.content
return self._get(f"{self._RESOURCE_PATH}/{id}/preview/pdf", headers={"accept": "application/pdf"}).content

def download_document_as_pdf(self, path: Path | str | IO, id: int, overwrite: bool = False) -> None:
"""`Downloads a pdf preview of the specified document. <https://developer.cognite.com/api#tag/Document-preview/operation/documentsPreviewPdf>`_
Expand All @@ -134,6 +133,7 @@ def download_document_as_pdf(self, path: Path | str | IO, id: int, overwrite: bo
>>> client.documents.previews.download_document_as_pdf("previews", id=123)
"""
if isinstance(path, IO):
# TODO(doctrino): This seems impossible to trigger
content = self.download_document_as_pdf_bytes(id)
path.write(content)
return
Expand Down Expand Up @@ -476,7 +476,6 @@ def retrieve_content(self, id: int) -> bytes:
in order to reduce the size of the returned payload. If you want the whole text for a document,
you can use this endpoint.


Args:
id (int): The server-generated ID for the document you want to retrieve the content of.

Expand All @@ -491,10 +490,7 @@ def retrieve_content(self, id: int) -> bytes:
>>> client = CogniteClient()
>>> content = client.documents.retrieve_content(id=123)
"""

body = {"id": id}
response = self._do_request("POST", f"{self._RESOURCE_PATH}/content", accept="text/plain", json=body)
return response.content
return self._post(f"{self._RESOURCE_PATH}/content", headers={"accept": "text/plain"}, json={"id": id}).content

def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None:
"""`Retrieve document content into buffer <https://developer.cognite.com/api#tag/Documents/operation/documentsContent>`_
Expand All @@ -506,7 +502,6 @@ def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None:
in order to reduce the size of the returned payload. If you want the whole text for a document,
you can use this endpoint.


Args:
id (int): The server-generated ID for the document you want to retrieve the content of.
buffer (BinaryIO): The document content is streamed directly into the buffer. This is useful for retrieving large documents.
Expand All @@ -521,12 +516,15 @@ def retrieve_content_buffer(self, id: int, buffer: BinaryIO) -> None:
>>> with Path("my_file.txt").open("wb") as buffer:
... client.documents.retrieve_content_buffer(id=123, buffer=buffer)
"""
with self._do_request(
"GET", f"{self._RESOURCE_PATH}/{id}/content", stream=True, accept="text/plain"
) as response:
for chunk in response.iter_content(chunk_size=2**21):
if chunk: # filter out keep-alive new chunks
buffer.write(chunk)
from cognite.client import global_config

_, full_url = resolve_url("GET", f"{self._RESOURCE_PATH}/{id}/content", self._api_version, self._config)
stream = self._stream(
"GET", full_url=full_url, headers={"accept": "text/plain"}, timeout=self._config.file_transfer_timeout
)
with stream as resp:
for chunk in resp.iter_bytes(chunk_size=global_config.file_download_chunk_size):
buffer.write(chunk)

@overload
def search(
Expand Down
54 changes: 27 additions & 27 deletions cognite/client/_api/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections.abc import Iterator, Sequence
from io import BufferedReader
from pathlib import Path
from typing import Any, BinaryIO, Literal, TextIO, cast, overload
from typing import Any, BinaryIO, Literal, cast, overload
from urllib.parse import urljoin, urlparse

from cognite.client._api_client import APIClient
Expand All @@ -28,7 +28,7 @@
)
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.exceptions import CogniteAPIError, CogniteAuthorizationError, CogniteFileUploadError
from cognite.client.utils._auxiliary import find_duplicates
from cognite.client.utils._auxiliary import find_duplicates, unpack_items
from cognite.client.utils._concurrency import execute_tasks
from cognite.client.utils._identifier import Identifier, IdentifierSequence
from cognite.client.utils._validation import process_asset_subtree_ids, process_data_set_ids
Expand Down Expand Up @@ -634,21 +634,21 @@ def upload_content_bytes(

return self._upload_bytes(content, res.json()["items"][0])

def _upload_bytes(self, content: bytes | TextIO | BinaryIO, returned_file_metadata: dict) -> FileMetadata:
def _upload_bytes(self, content: bytes | BinaryIO, returned_file_metadata: dict) -> FileMetadata:
upload_url = returned_file_metadata["uploadUrl"]
if urlparse(upload_url).netloc:
full_upload_url = upload_url
else:
full_upload_url = urljoin(self._config.base_url, upload_url)
file_metadata = FileMetadata._load(returned_file_metadata)
upload_response = self._http_client_with_retry.request(
upload_response = self._request(
"PUT",
full_upload_url,
data=content,
timeout=self._config.file_transfer_timeout,
content=content,
headers={"Content-Type": file_metadata.mime_type, "accept": "*/*"},
timeout=self._config.file_transfer_timeout,
)
if not upload_response.ok:
if not upload_response.is_success:
raise CogniteFileUploadError(message=upload_response.text, code=upload_response.status_code)
return file_metadata

Expand Down Expand Up @@ -885,26 +885,26 @@ def multipart_upload_content_session(
FileMetadata._load(returned_file_metadata), upload_urls, upload_id, self._cognite_client
)

def _upload_multipart_part(self, upload_url: str, content: str | bytes | TextIO | BinaryIO) -> None:
def _upload_multipart_part(self, upload_url: str, content: str | bytes | BinaryIO) -> None:
"""Upload part of a file to an upload URL returned from `multipart_upload_session`.
Note that if `content` does not somehow expose its length, this method may not work
on Azure. See `requests.utils.super_len`.

Args:
upload_url (str): URL to upload file chunk to.
content (str | bytes | TextIO | BinaryIO): The content to upload.
content (str | bytes | BinaryIO): The content to upload.
"""
if isinstance(content, str):
content = content.encode("utf-8")

upload_response = self._http_client_with_retry.request(
upload_response = self._request(
"PUT",
upload_url,
data=content,
timeout=self._config.file_transfer_timeout,
full_url=upload_url,
content=content,
headers={"accept": "*/*"},
timeout=self._config.file_transfer_timeout,
)
if not upload_response.ok:
if not upload_response.is_success:
raise CogniteFileUploadError(message=upload_response.text, code=upload_response.status_code)

def _complete_multipart_upload(self, session: FileMultipartUploadSession) -> None:
Expand Down Expand Up @@ -949,7 +949,7 @@ def retrieve_download_urls(
]
tasks_summary = execute_tasks(self._post, tasks, max_workers=self._config.max_workers)
tasks_summary.raise_compound_exception_if_failed_tasks()
results = tasks_summary.joined_results(unwrap_fn=lambda res: res.json()["items"])
results = tasks_summary.joined_results(unpack_items)
return {
result.get("id") or result.get("externalId") or NodeId.load(result["instanceId"]): result["downloadUrl"]
for result in results
Expand Down Expand Up @@ -1124,14 +1124,15 @@ def _process_file_download(
download_link = self._get_download_link(identifier)
self._download_file_to_path(download_link, file_path_absolute)

def _download_file_to_path(self, download_link: str, path: Path, chunk_size: int = 2**21) -> None:
with self._http_client_with_retry.request(
"GET", download_link, headers={"accept": "*/*"}, stream=True, timeout=self._config.file_transfer_timeout
) as r:
with path.open("wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
def _download_file_to_path(self, download_link: str, path: Path) -> None:
from cognite.client import global_config

stream = self._stream(
"GET", download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout
)
with stream as r, path.open("wb") as f:
for chunk in r.iter_bytes(chunk_size=global_config.file_download_chunk_size):
f.write(chunk)

def download_to_path(
self, path: Path | str, id: int | None = None, external_id: str | None = None, instance_id: NodeId | None = None
Expand Down Expand Up @@ -1185,10 +1186,9 @@ def download_bytes(
return self._download_file(download_link)

def _download_file(self, download_link: str) -> bytes:
res = self._http_client_with_retry.request(
"GET", download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout
)
return res.content
return self._request(
"GET", full_url=download_link, headers={"accept": "*/*"}, timeout=self._config.file_transfer_timeout
).content

def list(
self,
Expand Down
41 changes: 13 additions & 28 deletions cognite/client/_api/geospatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from cognite.client.exceptions import CogniteConnectionError
from cognite.client.utils import _json
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._url import resolve_url
from cognite.client.utils.useful_types import SequenceNotStr


Expand Down Expand Up @@ -708,13 +709,13 @@ def stream_features(
"allowCrsTransformation": allow_crs_transformation,
"allowDimensionalityMismatch": allow_dimensionality_mismatch,
}
res = self._do_request("POST", url_path=resource_path, json=payload, timeout=self._config.timeout, stream=True)

try:
for line in res.iter_lines():
yield Feature._load(_json.loads(line))
except (ChunkedEncodingError, ConnectionError) as e:
raise CogniteConnectionError(e)
_, full_url = resolve_url("POST", resource_path, self._api_version, self._config)
with self._stream("POST", full_url=full_url, json=payload) as resp:
try:
for line in resp.iter_lines():
yield Feature._load(_json.loads(line))
except (ChunkedEncodingError, ConnectionError) as e:
raise CogniteConnectionError(e)

def aggregate_features(
self,
Expand Down Expand Up @@ -968,13 +969,7 @@ def put_raster(
)
with open(file, "rb") as fh:
data = fh.read()
res = self._do_request(
"PUT",
url_path,
data=data,
headers={"Content-Type": "application/binary"},
timeout=self._config.timeout,
)
res = self._put(url_path, content=data, headers={"Content-Type": "application/binary"})
return RasterMetadata.load(res.json(), cognite_client=self._cognite_client)

def delete_raster(
Expand All @@ -1001,14 +996,9 @@ def delete_raster(
>>> raster_property_name = ...
>>> client.geospatial.delete_raster(feature_type.external_id, feature.external_id, raster_property_name)
"""
url_path = (
self._post(
self._raster_resource_path(feature_type_external_id, feature_external_id, raster_property_name) + "/delete"
)
self._do_request(
"POST",
url_path,
timeout=self._config.timeout,
)

def get_raster(
self,
Expand Down Expand Up @@ -1051,10 +1041,8 @@ def get_raster(
... raster_property_name, "XYZ", {"SIGNIFICANT_DIGITS": "4"})
"""
url_path = self._raster_resource_path(feature_type_external_id, feature_external_id, raster_property_name)
res = self._do_request(
"POST",
return self._post(
url_path,
timeout=self._config.timeout,
json={
"format": raster_format,
"options": raster_options,
Expand All @@ -1063,8 +1051,7 @@ def get_raster(
"scaleX": raster_scale_x,
"scaleY": raster_scale_y,
},
)
return res.content
).content

def compute(
self,
Expand All @@ -1088,10 +1075,8 @@ def compute(
>>> compute_function = GeospatialGeometryTransformComputeFunction(GeospatialGeometryValueComputeFunction("SRID=4326;POLYGON((0 0,10 0,10 10,0 10,0 0))"), srid=23031)
>>> compute_result = client.geospatial.compute(output = {"output": compute_function})
"""
res = self._do_request(
"POST",
res = self._post(
f"{GeospatialAPI._RESOURCE_PATH}/compute",
timeout=self._config.timeout,
json={"output": {k: v.to_json_payload() for k, v in output.items()}},
)
return GeospatialComputedResponse._load(res.json(), cognite_client=self._cognite_client)
2 changes: 1 addition & 1 deletion cognite/client/_api/postgres_gateway/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import cognite.client.data_classes.postgres_gateway.tables as pg
from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.utils._auxiliary import interpolate_and_url_encode
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import TablenameSequence
from cognite.client.utils._url import interpolate_and_url_encode
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_api/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from cognite.client.data_classes.raw import Database, DatabaseList, Row, RowCore, RowList, RowWrite
from cognite.client.utils._auxiliary import (
find_duplicates,
interpolate_and_url_encode,
is_finite,
is_unlimited,
split_into_chunks,
Expand All @@ -23,6 +22,7 @@
from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks
from cognite.client.utils._identifier import Identifier
from cognite.client.utils._importing import local_import
from cognite.client.utils._url import interpolate_and_url_encode
from cognite.client.utils._validation import assert_type
from cognite.client.utils.useful_types import SequenceNotStr

Expand Down
Loading
Loading