From 4308101ca3e1bfaa74769ca29586f4214f46bbe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Tue, 16 Apr 2024 16:08:54 +0200 Subject: [PATCH 01/10] Add ApiMixIn.list_with_meta list_with_meta returns a ModelList which holds ListMeta and a list of Model items --- k8s/base.py | 48 ++++++++++++++-- tests/k8s/test_base.py | 121 ++++++++++++++++++++++++++++++++++------- 2 files changed, 145 insertions(+), 24 deletions(-) diff --git a/k8s/base.py b/k8s/base.py index f3583ac..1b35fd4 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -21,14 +21,14 @@ import json import logging from collections import namedtuple -from typing import Optional +from typing import Optional, Dict, Type, List import requests import requests.packages.urllib3 as urllib3 from . import config from .client import Client, NotFound -from .fields import Field +from .fields import Field, ReadOnlyField LOG = logging.getLogger(__name__) LOG.addHandler(logging.NullHandler()) @@ -111,7 +111,7 @@ def find(cls, name="", namespace="default", labels=None): return [cls.from_dict(item) for item in resp.json()["items"]] @classmethod - def list(cls, namespace="default"): + def _list_raw(cls, namespace="default"): """List all resources in given namespace""" if namespace is None: if not cls._meta.list_url: @@ -120,8 +120,20 @@ def list(cls, namespace="default"): else: url = cls._build_url(name="", namespace=namespace) resp = cls._client.get(url) + return resp + + @classmethod + def list(cls, namespace="default"): + """List all resources in given namespace""" + resp = cls._list_raw(namespace=namespace) return [cls.from_dict(item) for item in resp.json()["items"]] + @classmethod + def list_with_meta(cls, namespace="default"): + """List all resources in given namespace. Return ModelList""" + resp = cls._list_raw(namespace=namespace) + return ModelList.from_dict(cls, resp.json()) + @classmethod def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False): """Return a generator that yields WatchEvents of cls. @@ -383,8 +395,11 @@ class WatchBaseEvent(ABC): __slots__ = ("resource_version",) - def __init__(self, event_json): - self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + def __init__(self, event_json, resource_version=None): + if resource_version is not None: + self.resource_version = resource_version + else: + self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") def __eq__(self, other): return self.resource_version == other.resource_version @@ -505,3 +520,26 @@ def __str__(self): @classmethod def match(cls, event_json): return event_json["type"] == "ERROR" and event_json["object"].get("kind") == "Status" + + +class ListMeta(Model): + _continue = Field(str) + remainingItemCount = Field(int) + resourceVersion = ReadOnlyField(str) + + +class ModelList: + """ + Generic type to hold list of Model instances (items) together with ListMeta (metadata), + as returned by list API calls + """ + + def __init__(self, metadata: ListMeta, items: List[Model]): + self.metadata = metadata + self.items = items + + @classmethod + def from_dict(cls, model_cls: Type[Model], list_response_data: Dict): + metadata = ListMeta.from_dict(list_response_data.get('metadata', {})) + items = [model_cls.from_dict(item) for item in list_response_data.get('items', [])] + return cls(metadata, items) diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index a14506a..47de0aa 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -21,6 +21,7 @@ import requests.packages.urllib3 as urllib3 from k8s.base import APIServerError, Equality, Exists, Field, In, Inequality, Model, NotIn, WatchBookmark, WatchEvent +from k8s.client import NotFound, ServerError, ClientError from k8s.models.common import DeleteOptions, Preconditions @@ -59,23 +60,23 @@ def test_find_by_name(self, client): Example.find("app_name") client.get.assert_called_once_with("/example", params={"labelSelector": "app=app_name"}) - @pytest.mark.parametrize("value, selector", ( - (Equality("my_value"), "my_key=my_value"), - (Inequality("my_value"), "my_key!=my_value"), - (In(("value1", "value2")), "my_key in (value1,value2)"), - (NotIn(("value1", "value2")), "my_key notin (value1,value2)"), - (Exists(), "my_key"), - ("my_unwrapped_value", "my_key=my_unwrapped_value"), - )) + @pytest.mark.parametrize( + "value, selector", + ( + (Equality("my_value"), "my_key=my_value"), + (Inequality("my_value"), "my_key!=my_value"), + (In(("value1", "value2")), "my_key in (value1,value2)"), + (NotIn(("value1", "value2")), "my_key notin (value1,value2)"), + (Exists(), "my_key"), + ("my_unwrapped_value", "my_key=my_unwrapped_value"), + ), + ) def test_find_by_selectors(self, client, value, selector): Example.find(labels={"my_key": value}) client.get.assert_called_once_with("/example", params={"labelSelector": selector}) def test_repeated_keys_in_label_selector(self, client): - labels = [ - ("foo", Inequality("bar")), - ("foo", Exists()) - ] + labels = [("foo", Inequality("bar")), ("foo", Exists())] Example.find(labels=labels) expected_selector = "foo!=bar,foo" @@ -84,7 +85,6 @@ def test_repeated_keys_in_label_selector(self, client): class TestDeleteList(object): - @pytest.fixture def client(self): with mock.patch.object(Example, "_client") as m: @@ -101,7 +101,7 @@ def test_delete_with_options(self, client): dryRun=[], gracePeriodSeconds=30, preconditions=Preconditions(uid="1234", resourceVersion="12"), - propagationPolicy="Foreground" + propagationPolicy="Foreground", ) Example.delete_list(labels={"foo": "bar"}, delete_options=opts) @@ -109,11 +109,8 @@ def test_delete_with_options(self, client): "apiVersion": "foo/v1", "dryRun": [], "gracePeriodSeconds": 30, - "preconditions": { - "uid": "1234", - "resourceVersion": "12" - }, - "propagationPolicy": "Foreground" + "preconditions": {"uid": "1234", "resourceVersion": "12"}, + "propagationPolicy": "Foreground", } client.delete.assert_called_once_with("/example", params={"labelSelector": "foo=bar"}, body=expected_body) @@ -165,3 +162,89 @@ def test_watch_list_bookmark(self, client): client.get.assert_called_once_with( "/watch/example", stream=True, timeout=270, params={"resourceVersion": 4711, "allowWatchBookmarks": "true"} ) + + +class TestList: + @pytest.fixture + def response(self): + data = { + "metadata": { + "resourceVersion": "1", + "continue": "ENCODED_CONTINUE_TOKEN", + "remainingItemCount": 1, + }, + "items": [ + {"value": 42}, + {"value": 1337}, + ], + } + resp = mock.create_autospec(requests.Response, spec_set=True) + resp.json.return_value = data + yield resp + + @pytest.fixture + def response_empty(self): + data = { + "metadata": { + "resourceVersion": "2", + }, + "items": [], + } + resp = mock.create_autospec(requests.Response, spec_set=True) + resp.json.return_value = data + yield resp + + @pytest.fixture + def client(self): + with mock.patch.object(Example, "_client") as m: + yield m + + def test_list(self, client, response): + client.get.return_value = response + + expected = [ + Example(value=42), + Example(value=1337), + ] + assert Example.list() == expected + + def test_list_empty(self, client, response_empty): + client.get.return_value = response_empty + + assert Example.list() == [] + + def test_list_with_meta_empty(self, client, response_empty): + client.get.return_value = response_empty + + assert Example.list_with_meta().metadata.resourceVersion == "2" + assert Example.list_with_meta().metadata._continue is None + assert Example.list_with_meta().metadata.remainingItemCount is None + assert Example.list_with_meta().items == [] + + @pytest.mark.parametrize( + "exception", + ( + NotFound, + ClientError, + ServerError, + ), + ) + def test_list_error(self, client, exception): + client.get.side_effect = exception + + with pytest.raises(exception): + Example.list() + + @pytest.mark.parametrize( + "exception", + ( + NotFound, + ClientError, + ServerError, + ), + ) + def test_list_with_meta_error(self, client, exception): + client.get.side_effect = exception + + with pytest.raises(exception): + Example.list_with_meta() From 4cfb2a6b15587e34c10ffd163eaca99191b6644a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Wed, 17 Apr 2024 14:01:59 +0200 Subject: [PATCH 02/10] Reduce risk of yielding stale events from watcher It looks like bookmarks event aren't being sent on the watch connection on the first connect, when the connection is started with resourceVersion=None. In some cases the lack of bookmark events can cause the watcher to yield stale events after reconnecting for the first time. For example if we assume a empty namespace where the following changes happen (in this order): - resource a is added with resourceVersion=1 - resource b is added with resourceVersion=2 - resource b is deleted After that, the watch connection starts, and would yield one event for the only existing resource; - ADDED for resource a with resourceVersion=1 when the watch connection eventually times out and restarts it would use resourceVersion=1, and yield - ADDED for resource b with resourceVersion=2 - DELETED for resource b with resourceVersion=2 since these events happened after resourceVersion 1. To avoid these stale events; instead of relying on the initial list of events from the watch_list call, do an actual list call before starting watch connection, and use resourceVersion from the ListMeta returned from list call when starting watch connection, as suggested in current documentation[1]. This behavour could also be avoided by ensuring bookmark events are sent on the initial watch connection, which could be achieved by enabling the streaming lists[2] feature, but that is currenly only in alpha status as of Kubernetes 1.27. [1]: https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes [2]: https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists --- k8s/base.py | 26 ++++- k8s/watcher.py | 16 ++- tests/k8s/test_watcher.py | 227 +++++++++++++++++++++++++++++--------- 3 files changed, 211 insertions(+), 58 deletions(-) diff --git a/k8s/base.py b/k8s/base.py index 1b35fd4..526f68a 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -28,7 +28,7 @@ from . import config from .client import Client, NotFound -from .fields import Field, ReadOnlyField +from .fields import Field LOG = logging.getLogger(__name__) LOG.addHandler(logging.NullHandler()) @@ -151,7 +151,7 @@ def watch_list(cls, namespace=None, resource_version=None, allow_bookmarks=False # As per https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch # only resourceVersion is used for watch queries. params["resourceVersion"] = resource_version - LOG.info("Restarting %s watch at resource version %s", cls.__name__, resource_version) + LOG.info("(Re)starting %s watch at resource version %s", cls.__name__, resource_version) if allow_bookmarks: params["allowWatchBookmarks"] = "true" @@ -430,6 +430,26 @@ def has_object(self): return True +class SyntheticAddedWatchEvent(WatchBaseEvent): + def __init__(self, obj: Model): + # TODO: should SyntheticAddWatchEvent inherit WatchEvent? Does it even need to be its own class? + resource_version = obj.metadata.resourceVersion + super().__init__({}, resource_version) + self.type = WatchEvent.ADDED + self.object = obj + + def __repr__(self): + return "{cls}(type={type}, object={object})".format( + cls=self.__class__.__name__, type=self.type, object=self.object + ) + + def __eq__(self, other): + return self.type == other.type and self.object == other.object + + def has_object(self): + return True + + class WatchBookmark(WatchBaseEvent): """Bookmark events, if enabled, are sent periodically by the API server. They only contain the resourceVersion of the event.""" @@ -525,7 +545,7 @@ def match(cls, event_json): class ListMeta(Model): _continue = Field(str) remainingItemCount = Field(int) - resourceVersion = ReadOnlyField(str) + resourceVersion = Field(str) class ModelList: diff --git a/k8s/watcher.py b/k8s/watcher.py index e81c3b5..cfc2d05 100644 --- a/k8s/watcher.py +++ b/k8s/watcher.py @@ -17,11 +17,14 @@ import cachetools +import logging -from .base import APIServerError, WatchEvent +from .base import APIServerError, WatchEvent, SyntheticAddedWatchEvent DEFAULT_CAPACITY = 1000 +LOG = logging.getLogger(__name__) + class Watcher(object): """Higher-level interface to watch for changes in objects @@ -54,6 +57,17 @@ def watch(self, namespace=None): # Only used on reconnects, the first call to watch does a quorum read. last_seen_resource_version = None while self._run_forever: + if last_seen_resource_version is None: + # list all resources and yield a synthetic ADDED watch event for each + model_list = self._model.list_with_meta() + LOG.info("Got %d %s instances from quorum read", len(model_list.items), self._model.__name__) + for obj in model_list.items: + event = SyntheticAddedWatchEvent(obj) + # _should_yield is mainly called here to feed the self._seen cache + if self._should_yield(event): + yield event + # watch connection should start at the version of the initial list + last_seen_resource_version = model_list.metadata.resourceVersion try: for event in self._model.watch_list( namespace=namespace, resource_version=last_seen_resource_version, allow_bookmarks=True diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index 860cbbd..9c0afb0 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -19,7 +19,7 @@ import mock import pytest -from k8s.base import APIServerError, Field, Model, WatchBookmark, WatchEvent +from k8s.base import APIServerError, Field, Model, WatchBookmark, WatchEvent, ModelList, ListMeta from k8s.models.common import ObjectMeta from k8s.watcher import Watcher @@ -29,28 +29,109 @@ DELETED = WatchEvent.DELETED +class WatchListExample(Model): + class Meta: + url_template = '/example' + watch_list_url = '/watch/example' + watch_list_url_template = '/watch/{namespace}/example' + + apiVersion = Field(str, "example.com/v1") + kind = Field(str, "Example") + metadata = Field(ObjectMeta) + value = Field(int) + + +def _example_resource(id, rv, namespace="default"): + metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": str(rv)} + metadata = ObjectMeta.from_dict(metadict) + return WatchListExample(metadata=metadata, value=(id * 100) + rv) + + +def _event(id, event_type, rv, namespace="default"): + wle = _example_resource(id, rv, namespace) + return mock.NonCallableMagicMock(type=event_type, object=wle) + + +def _assert_event(event, id, event_type, rv, namespace="default"): + assert event.type == event_type + o = event.object + assert o.kind == "Example" + assert o.metadata.name == "name{}".format(id) + assert o.metadata.namespace == namespace + assert o.value == (id * 100) + rv + + @pytest.mark.usefixtures("k8s_config", "logger") class TestWatcher(object): + @pytest.fixture + def api_list_with_meta(self): + with mock.patch("k8s.base.ApiMixIn.list_with_meta") as m: + yield m + @pytest.fixture def api_watch_list(self): with mock.patch("k8s.base.ApiMixIn.watch_list") as m: yield m - def test_multiple_events(self, api_watch_list): - number_of_events = 20 - events = [_event(i, ADDED, 1) for i in range(number_of_events)] + @pytest.mark.parametrize( + 'initial_resources,list_resource_version,events', + ( + # 20 initial resources, then 20 watch events + ([_example_resource(i, 100 + i) for i in range(20)], "200", [_event(i, ADDED, 300 + i) for i in range(20)]), + # 20 initial resources, no watch events + ([_example_resource(i, 100 + i) for i in range(20)], "200", []), + # no initial resources, 20 watch events + ([], "1", [_event(i, ADDED, 300 + i) for i in range(20)]), + ), + ) + def test_multiple_events( + self, api_watch_list, api_list_with_meta, initial_resources, list_resource_version, events + ): + model_list = ModelList(metadata=ListMeta(resourceVersion=list_resource_version), items=initial_resources) + api_list_with_meta.return_value = model_list api_watch_list.side_effect = [events] + watcher = Watcher(WatchListExample) gen = watcher.watch() - for i in range(number_of_events): - _assert_event(next(gen), i, ADDED, 1) + # verify that the initial resources are yielded by the watcher first + for i in range(len(initial_resources)): + _assert_event(next(gen), i, ADDED, 100 + i) + + # verify that the events from the watch_list call are yielded by the watcher + for i in range(len(events)): + _assert_event(next(gen), i, ADDED, 300 + i) + + # stop the watcher loop and verify that there are no more events watcher._run_forever = False assert list(gen) == [] - api_watch_list.assert_called_with(namespace=None, resource_version=None, allow_bookmarks=True) + api_list_with_meta.assert_called_with() + # verify watch_list was called with resourceVersion returned by list call + api_watch_list.assert_called_with(namespace=None, resource_version=list_resource_version, allow_bookmarks=True) + + def test_no_events(self, api_watch_list, api_list_with_meta): + list_resource_version = "1" + model_list = ModelList(metadata=ListMeta(resourceVersion=list_resource_version), items=[]) + api_list_with_meta.return_value = model_list + + def stop_iteration(*args, **kwargs): + watcher._run_forever = False + return [] + + api_watch_list.side_effect = stop_iteration + + watcher = Watcher(WatchListExample) + gen = watcher.watch() + + assert list(gen) == [] + + api_list_with_meta.assert_called_with() + # verify watch_list was called with resourceVersion returned by list call + api_watch_list.assert_called_with(namespace=None, resource_version=list_resource_version, allow_bookmarks=True) - def test_handle_reconnect(self, api_watch_list): + def test_handle_reconnect(self, api_watch_list, api_list_with_meta): + # TODO: what does this test? events = [_event(0, ADDED, 1)] api_watch_list.side_effect = [events, events] watcher = Watcher(WatchListExample) @@ -60,7 +141,10 @@ def test_handle_reconnect(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] - def test_handle_changes(self, api_watch_list): + def test_handle_changes(self, api_watch_list, api_list_with_meta): + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=[]) + api_list_with_meta.return_value = model_list + events = [_event(0, ADDED, 1), _event(0, MODIFIED, 2)] api_watch_list.side_effect = [events] watcher = Watcher(WatchListExample) @@ -72,7 +156,11 @@ def test_handle_changes(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] - def test_complicated(self, api_watch_list): + def test_complicated(self, api_watch_list, api_list_with_meta): + initial_resources = [_example_resource(1, 0), _example_resource(2, 0)] + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=initial_resources) + api_list_with_meta.return_value = model_list + first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1)] second = [_event(0, ADDED, 1), _event(1, ADDED, 2), _event(2, ADDED, 1), _event(0, MODIFIED, 2)] third = [_event(0, ADDED, 2), _event(1, DELETED, 2), _event(2, ADDED, 1), _event(2, MODIFIED, 2)] @@ -81,30 +169,36 @@ def test_complicated(self, api_watch_list): watcher = Watcher(WatchListExample) gen = watcher.watch() - # First batch + # Synthetic added events for the initial resources + _assert_event(next(gen), 1, ADDED, 0) + _assert_event(next(gen), 2, ADDED, 0) + + # First batch of events _assert_event(next(gen), 0, ADDED, 1) _assert_event(next(gen), 1, ADDED, 1) _assert_event(next(gen), 2, ADDED, 1) - # Second batch + # Second batch of events _assert_event(next(gen), 1, ADDED, 2) _assert_event(next(gen), 0, MODIFIED, 2) - # Third batch + # Third batch of events _assert_event(next(gen), 1, DELETED, 2) _assert_event(next(gen), 2, MODIFIED, 2) - # Fourth batch + # Fourth batch of events _assert_event(next(gen), 0, ADDED, 1, "other") _assert_event(next(gen), 0, MODIFIED, 2, "other") watcher._run_forever = False assert list(gen) == [] - def test_namespace(self, api_watch_list): + def test_namespace(self, api_watch_list, api_list_with_meta): namespace = "the-namespace" watcher = Watcher(WatchListExample) + api_list_with_meta.return_value = ModelList(metadata=ListMeta(), items=[]) + def stop_iteration(*args, **kwargs): watcher._run_forever = False return [] @@ -115,33 +209,90 @@ def stop_iteration(*args, **kwargs): assert list(gen) == [] + api_list_with_meta.assert_called_with() api_watch_list.assert_called_with(namespace=namespace, resource_version=None, allow_bookmarks=True) - def test_handle_410(self, api_watch_list): + def test_handle_410_list(self, api_watch_list, api_list_with_meta): + # the initial list call should not receive 410, since it doesn't send a resourceversion. If it does, something + # is probably wrong, and the exception should be propagated to the caller. + api_list_with_meta.side_effect = APIServerError({"code": 410, "message": "Gone"}) + + watcher = Watcher(WatchListExample) + with pytest.raises(APIServerError, match="Gone"): + next(watcher.watch()) + + def test_handle_410_watch(self, api_watch_list, api_list_with_meta): + # 410 response can occur if watch connection starts with a too old resourceVersion + # - this can happen on reconnect if last_seen_resource_version is too old, for example if the apiserver + # doesn't send bookmark events, or sends them at intervals longer than the client watch timeout + # (k8s.config.stream_timeout, default 4.5 minutes) + # - in theory the resourceVersion returned by the list call could be too old when the watch connection starts + # if it takes too long to yield syntetic added watch events for the items returned by the list call. + # How long this takes depends on the consumer of the generator returned by watcher.watch(). If this happens, + # the watcher will do another quorum read. Since there is a cache of seen items in the watcher, as long as + # all items fit in the cache, the number of events yielded for items from the list call approach zero, + # eventually allowing the watch connection to start. watcher = Watcher(WatchListExample) + first_list_resource_version = "1" + second_list_resource_version = "4" + api_list_with_meta.side_effect = [ + ModelList(metadata=ListMeta(resourceVersion=first_list_resource_version), items=[_example_resource(0, 0)]), + ModelList(metadata=ListMeta(resourceVersion=second_list_resource_version), items=[_example_resource(0, 0)]), + ] + api_watch_list.return_value.__getitem__.side_effect = [ - _event(0, ADDED, 1), + _event(1, ADDED, 2), APIServerError({"code": 410, "message": "Gone"}), - _event(0, MODIFIED, 2), + _event(1, MODIFIED, 3), ] # Seal the mock to make sure __iter__ is not used instead of __getitem__ mock.seal(api_watch_list) gen = watcher.watch() - _assert_event(next(gen), 0, ADDED, 1) - _assert_event(next(gen), 0, MODIFIED, 2) + + # synthetic added event for initial resource and added event + _assert_event(next(gen), 0, ADDED, 0) + _assert_event(next(gen), 1, ADDED, 2) + api_list_with_meta.called_once() + api_watch_list.assert_called_once_with( + namespace=None, resource_version=first_list_resource_version, allow_bookmarks=True + ) + + # next will raise 410 from watch_list, call list and watch_list again, then yield the last event + _assert_event(next(gen), 1, MODIFIED, 3) + # verify list and watch_list has now been called twice, and each call of watch_list used the resourceVersion + # returned by the preceding list call + assert api_list_with_meta.call_args_list == [mock.call(), mock.call()] + api_watch_list.call_args_list == [ + mock.call(namespace=None, resource_version=first_list_resource_version, allow_bookmarks=True), + mock.call(namespace=None, resource_version=second_list_resource_version, allow_bookmarks=True), + ] + + # no more events watcher._run_forever = False assert list(gen) == [] - def test_other_apierror(self, api_watch_list): + def test_other_apierror_list(self, api_list_with_meta): watcher = Watcher(WatchListExample) + api_list_with_meta.side_effect = APIServerError({"code": 400, "message": "Bad Request"}) + + with pytest.raises(APIServerError, match="Bad Request"): + next(watcher.watch()) + + def test_other_apierror_watch(self, api_watch_list, api_list_with_meta): + watcher = Watcher(WatchListExample) + + api_list_with_meta.return_value = ModelList(metadata=ListMeta(), items=[]) api_watch_list.side_effect = APIServerError({"code": 400, "message": "Bad Request"}) with pytest.raises(APIServerError, match="Bad Request"): next(watcher.watch()) - def test_bookmark(self, api_watch_list): + def test_bookmark(self, api_watch_list, api_list_with_meta): + watcher = Watcher(WatchListExample) + api_list_with_meta.return_value = ModelList(metadata=ListMeta(), items=[]) + watcher = Watcher(WatchListExample) api_watch_list.return_value.__getitem__.side_effect = [ @@ -157,35 +308,3 @@ def test_bookmark(self, api_watch_list): _assert_event(next(gen), 1, MODIFIED, 3) watcher._run_forever = False assert list(gen) == [] - - -def _event(id, event_type, rv, namespace="default"): - metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv} - metadata = ObjectMeta.from_dict(metadict) - wle = WatchListExample(metadata=metadata, value=(id * 100) + rv) - return mock.NonCallableMagicMock(type=event_type, object=wle) - - -def _assert_event(event, id, event_type, rv, namespace="default"): - assert event.type == event_type - o = event.object - assert o.kind == "Example" - assert o.metadata.name == "name{}".format(id) - assert o.metadata.namespace == namespace - assert o.value == (id * 100) + rv - - -class WatchListExample(Model): - class Meta: - url_template = '/example' - watch_list_url = '/watch/example' - watch_list_url_template = '/watch/{namespace}/example' - - apiVersion = Field(str, "example.com/v1") - kind = Field(str, "Example") - metadata = Field(ObjectMeta) - value = Field(int) - - -class SentinelException(Exception): - pass From 4f20de91a277fd9a73c8eb77bc6608b59a136a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Thu, 18 Apr 2024 14:11:53 +0200 Subject: [PATCH 03/10] Fix warnings - add missing assert - avoid shadowing id --- tests/k8s/test_watcher.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index 9c0afb0..dbed2ea 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -41,24 +41,24 @@ class Meta: value = Field(int) -def _example_resource(id, rv, namespace="default"): - metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": str(rv)} +def _example_resource(_id, rv, namespace="default"): + metadict = {"name": "name{}".format(_id), "namespace": namespace, "resourceVersion": str(rv)} metadata = ObjectMeta.from_dict(metadict) - return WatchListExample(metadata=metadata, value=(id * 100) + rv) + return WatchListExample(metadata=metadata, value=(_id * 100) + rv) -def _event(id, event_type, rv, namespace="default"): - wle = _example_resource(id, rv, namespace) +def _event(_id, event_type, rv, namespace="default"): + wle = _example_resource(_id, rv, namespace) return mock.NonCallableMagicMock(type=event_type, object=wle) -def _assert_event(event, id, event_type, rv, namespace="default"): +def _assert_event(event, _id, event_type, rv, namespace="default"): assert event.type == event_type o = event.object assert o.kind == "Example" - assert o.metadata.name == "name{}".format(id) + assert o.metadata.name == "name{}".format(_id) assert o.metadata.namespace == namespace - assert o.value == (id * 100) + rv + assert o.value == (_id * 100) + rv @pytest.mark.usefixtures("k8s_config", "logger") @@ -264,7 +264,7 @@ def test_handle_410_watch(self, api_watch_list, api_list_with_meta): # verify list and watch_list has now been called twice, and each call of watch_list used the resourceVersion # returned by the preceding list call assert api_list_with_meta.call_args_list == [mock.call(), mock.call()] - api_watch_list.call_args_list == [ + assert api_watch_list.call_args_list == [ mock.call(namespace=None, resource_version=first_list_resource_version, allow_bookmarks=True), mock.call(namespace=None, resource_version=second_list_resource_version, allow_bookmarks=True), ] From b210d07600be72b8c58ffe76579aee8c79407e7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Thu, 18 Apr 2024 16:16:10 +0200 Subject: [PATCH 04/10] Add missing test --- tests/k8s/test_base.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index 47de0aa..92c95ca 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -208,6 +208,21 @@ def test_list(self, client, response): ] assert Example.list() == expected + def test_list_with_meta(self, client, response): + client.get.return_value = response + + expected_items = [ + Example(value=42), + Example(value=1337), + ] + + actual = Example.list_with_meta() + + assert actual.metadata.resourceVersion == "1" + assert actual.metadata._continue == "ENCODED_CONTINUE_TOKEN" + assert actual.metadata.remainingItemCount == 1 + assert actual.items == expected_items + def test_list_empty(self, client, response_empty): client.get.return_value = response_empty @@ -216,10 +231,12 @@ def test_list_empty(self, client, response_empty): def test_list_with_meta_empty(self, client, response_empty): client.get.return_value = response_empty - assert Example.list_with_meta().metadata.resourceVersion == "2" - assert Example.list_with_meta().metadata._continue is None - assert Example.list_with_meta().metadata.remainingItemCount is None - assert Example.list_with_meta().items == [] + actual = Example.list_with_meta() + + assert actual.metadata.resourceVersion == "2" + assert actual.metadata._continue is None + assert actual.metadata.remainingItemCount is None + assert actual.items == [] @pytest.mark.parametrize( "exception", From cba48a938b4380a56b7a1daf81fcdf73fbff9d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Fri, 19 Apr 2024 15:06:44 +0200 Subject: [PATCH 05/10] Simplify WatchEvent and SyntheticAddedWatchEvent classes Note: This changes the signature of WatchBaseEvent.__init__() and WatchEvent.__init(). WatchBaseEvent shouldn't need to be used by clients of this library. WatchEvent is emitted by Model.watch_list and Watcher.watch, and clients should not usually need to create instances of it, except for maybe in tests. - WatchEventBase.__init__ used to take event_json, but since it only needs resourceVersion it has been changed to only take that as parameter, and subclasses are responsible for providing that value. If extending this ABC, change super calls from `super().__init__(event_json)` to `super().__init__(event_json["object"].get("metadata", {}).get("resourceVersion"))` or equivalent - WatchEvent.__init__ used to take event_json and Model class, this is now supported via the WatchEvent.from_dict classmethod with the same signature. WatchEvent.__init__ is changed to take WatchEvent type and Model instance, to make it easier to extend, for example by SyntheticAddedWatchEvent. If creating instances of WatchEvent, change calls from `WatchEvent(event_json, cls)` to `WatchEvent.from_dict(event_json, cls)`. --- k8s/base.py | 48 +++++++++++++++++----------------------- tests/k8s/test_base.py | 12 +++++----- tests/k8s/test_client.py | 18 +++++---------- 3 files changed, 32 insertions(+), 46 deletions(-) diff --git a/k8s/base.py b/k8s/base.py index 526f68a..acc078d 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -212,7 +212,7 @@ def _parse_watch_event(cls, line) -> Optional[WatchBaseEvent]: event = WatchBookmark(event_json) else: LOG.debug("Received watch event from API server: %s", event_json) - event = WatchEvent(event_json, cls) + event = WatchEvent.from_dict(event_json, cls) return event except TypeError: LOG.exception( @@ -395,11 +395,8 @@ class WatchBaseEvent(ABC): __slots__ = ("resource_version",) - def __init__(self, event_json, resource_version=None): - if resource_version is not None: - self.resource_version = resource_version - else: - self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + def __init__(self, resource_version): + self.resource_version = resource_version def __eq__(self, other): return self.resource_version == other.resource_version @@ -413,10 +410,19 @@ class WatchEvent(WatchBaseEvent): MODIFIED = "MODIFIED" DELETED = "DELETED" - def __init__(self, event_json, cls): - super(WatchEvent, self).__init__(event_json) - self.type = event_json["type"] - self.object = cls.from_dict(event_json["object"]) + def __init__(self, _type: str, _object: Model): + # resource_version is effectively optional here to replicate the previous behavior + # in practice, watch events with None resourceVersion will break the caching in Watcher.watch() + resource_version = getattr(getattr(_object, "metadata", None), "resourceVersion", None) + super(WatchEvent, self).__init__(resource_version) + self.type = _type + self.object = _object + + @classmethod + def from_dict(cls, event_json: Dict, model_cls: Model) -> WatchEvent: + _type = event_json["type"] + _object = model_cls.from_dict(event_json["object"]) + return cls(_type, _object) def __repr__(self): return "{cls}(type={type}, object={object})".format( @@ -430,24 +436,9 @@ def has_object(self): return True -class SyntheticAddedWatchEvent(WatchBaseEvent): +class SyntheticAddedWatchEvent(WatchEvent): def __init__(self, obj: Model): - # TODO: should SyntheticAddWatchEvent inherit WatchEvent? Does it even need to be its own class? - resource_version = obj.metadata.resourceVersion - super().__init__({}, resource_version) - self.type = WatchEvent.ADDED - self.object = obj - - def __repr__(self): - return "{cls}(type={type}, object={object})".format( - cls=self.__class__.__name__, type=self.type, object=self.object - ) - - def __eq__(self, other): - return self.type == other.type and self.object == other.object - - def has_object(self): - return True + super(SyntheticAddedWatchEvent, self).__init__(WatchEvent.ADDED, obj) class WatchBookmark(WatchBaseEvent): @@ -455,7 +446,8 @@ class WatchBookmark(WatchBaseEvent): They only contain the resourceVersion of the event.""" def __init__(self, event_json): - super(WatchBookmark, self).__init__(event_json) + resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + super(WatchBookmark, self).__init__(resource_version) @classmethod def match(cls, event_json): diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index 92c95ca..68998f1 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -22,7 +22,7 @@ from k8s.base import APIServerError, Equality, Exists, Field, In, Inequality, Model, NotIn, WatchBookmark, WatchEvent from k8s.client import NotFound, ServerError, ClientError -from k8s.models.common import DeleteOptions, Preconditions +from k8s.models.common import DeleteOptions, Preconditions, ObjectMeta class Example(Model): @@ -35,17 +35,17 @@ class Meta: class TestWatchEvent(object): def test_watch_event_added(self): - watch_event = WatchEvent({"type": "ADDED", "object": {"value": 42}}, Example) + watch_event = WatchEvent.from_dict({"type": "ADDED", "object": {"value": 42}}, Example) assert watch_event.type == WatchEvent.ADDED assert watch_event.object == Example(value=42) def test_watch_event_modified(self): - watch_event = WatchEvent({"type": "MODIFIED", "object": {"value": 42}}, Example) + watch_event = WatchEvent.from_dict({"type": "MODIFIED", "object": {"value": 42}}, Example) assert watch_event.type == WatchEvent.MODIFIED assert watch_event.object == Example(value=42) def test_watch_event_deleted(self): - watch_event = WatchEvent({"type": "DELETED", "object": {"value": 42}}, Example) + watch_event = WatchEvent.from_dict({"type": "DELETED", "object": {"value": 42}}, Example) assert watch_event.type == WatchEvent.DELETED assert watch_event.object == Example(value=42) @@ -126,7 +126,7 @@ def test_watch_list(self, client): '{"type": "ADDED", "object": {"value": 1}}', ] gen = Example.watch_list() - assert next(gen) == WatchEvent({"type": "ADDED", "object": {"value": 1}}, Example) + assert next(gen) == WatchEvent.from_dict({"type": "ADDED", "object": {"value": 1}}, Example) client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) assert list(gen) == [] @@ -139,7 +139,7 @@ def test_watch_list_with_timeout(self, client): # Seal to avoid __iter__ being used instead of __getitem__ mock.seal(client) gen = Example.watch_list() - assert next(gen) == WatchEvent({"type": "ADDED", "object": {"value": 1}}, Example) + assert next(gen) == WatchEvent.from_dict({"type": "ADDED", "object": {"value": 1}}, Example) assert list(gen) == [] assert client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2 client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) diff --git a/tests/k8s/test_client.py b/tests/k8s/test_client.py index 55d0aeb..fae3163 100644 --- a/tests/k8s/test_client.py +++ b/tests/k8s/test_client.py @@ -223,8 +223,8 @@ def test_watch_list_payload_ok(self, get): get.return_value = response expected = [ - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), - _create_watchevent(WatchEvent.MODIFIED, WatchListExample(value=3, requiredValue=4)), + WatchEvent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(WatchEvent.MODIFIED, WatchListExample(value=3, requiredValue=4)), ] items = list(WatchListExample.watch_list()) @@ -257,9 +257,9 @@ def test_watch_list_payload_invalid_json(self, get): get.return_value = response expected = [ - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), # "definitely not valid json" should be discarded - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), + WatchEvent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), ] items = list(WatchListExample.watch_list()) @@ -297,21 +297,15 @@ def test_watch_list_payload_invalid_object(self, get): get.return_value = response expected = [ - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), # event with value=10 and requiredValue missing should be discarded - _create_watchevent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), + WatchEvent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), ] items = list(WatchListExample.watch_list()) assert items == expected -def _create_watchevent(event_type, event_object): - """factory function for WatchEvent to make it easier to create test data from actual objects, as the constructor - takes a dict (unmarshaled json)""" - return WatchEvent({"type": event_type, "object": event_object.as_dict()}, event_object.__class__) - - def _absolute_url(url): return config.api_server + url From 930c58bd31ab45437434d725358952110f3145e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Fri, 19 Apr 2024 15:16:59 +0200 Subject: [PATCH 06/10] Add metadata field to Example test model metadata is required to use the Watcher, so it is better that the test data has this field --- tests/k8s/test_base.py | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index 68998f1..75cf02b 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -30,24 +30,38 @@ class Meta: url_template = '/example' watch_list_url = '/watch/example' + metadata = Field(ObjectMeta) value = Field(int) +def _example_object(value=42, resource_version="1"): + # Since metadata.resourceVersion is a ReadOnlyField values set are ignored. To avoid this we have to use from_dict + # to set the field value, like when deserializing an API response. + metadata = ObjectMeta.from_dict({"resourceVersion": resource_version}) + return Example(metadata=metadata, value=value) + + class TestWatchEvent(object): def test_watch_event_added(self): - watch_event = WatchEvent.from_dict({"type": "ADDED", "object": {"value": 42}}, Example) + obj = _example_object(42, "1") + event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} + watch_event = WatchEvent.from_dict(event_dict, Example) assert watch_event.type == WatchEvent.ADDED - assert watch_event.object == Example(value=42) + assert watch_event.object == obj def test_watch_event_modified(self): - watch_event = WatchEvent.from_dict({"type": "MODIFIED", "object": {"value": 42}}, Example) + obj = _example_object(42, "1") + event_dict = {"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} + watch_event = WatchEvent.from_dict(event_dict, Example) assert watch_event.type == WatchEvent.MODIFIED - assert watch_event.object == Example(value=42) + assert watch_event.object == obj def test_watch_event_deleted(self): - watch_event = WatchEvent.from_dict({"type": "DELETED", "object": {"value": 42}}, Example) + obj = _example_object(42, "1") + event_dict = {"type": "DELETED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} + watch_event = WatchEvent.from_dict(event_dict, Example) assert watch_event.type == WatchEvent.DELETED - assert watch_event.object == Example(value=42) + assert watch_event.object == obj class TestFind(object): @@ -123,23 +137,25 @@ def client(self): def test_watch_list(self, client): client.get.return_value.iter_lines.return_value = [ - '{"type": "ADDED", "object": {"value": 1}}', + '{"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}}', ] gen = Example.watch_list() - assert next(gen) == WatchEvent.from_dict({"type": "ADDED", "object": {"value": 1}}, Example) + event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}} + assert next(gen) == WatchEvent.from_dict(event_dict, Example) client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) assert list(gen) == [] def test_watch_list_with_timeout(self, client): client.get.return_value.iter_lines.return_value.__getitem__.side_effect = [ - '{"type": "ADDED", "object": {"value": 1}}', + '{"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}}', requests.ConnectionError(urllib3.exceptions.ReadTimeoutError("", "", "")), - '{"type": "MODIFIED", "object": {"value": 2}}', # Not reached + '{"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "2"}, "value": 2}}', # Not reached ] # Seal to avoid __iter__ being used instead of __getitem__ mock.seal(client) gen = Example.watch_list() - assert next(gen) == WatchEvent.from_dict({"type": "ADDED", "object": {"value": 1}}, Example) + event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}} + assert next(gen) == WatchEvent.from_dict(event_dict, Example) assert list(gen) == [] assert client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2 client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) From eed341cdeaabbf767b90c94029b84e832cc25b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Fri, 19 Apr 2024 15:17:49 +0200 Subject: [PATCH 07/10] Use real WatchEvent instance as event in watcher tests --- tests/k8s/test_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index dbed2ea..54c066a 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -49,7 +49,7 @@ def _example_resource(_id, rv, namespace="default"): def _event(_id, event_type, rv, namespace="default"): wle = _example_resource(_id, rv, namespace) - return mock.NonCallableMagicMock(type=event_type, object=wle) + return WatchEvent(event_type, wle) def _assert_event(event, _id, event_type, rv, namespace="default"): From 6cc1eb8bb1e0f402df2168b7dfd77a1609ea4e38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Wed, 17 Apr 2024 14:01:59 +0200 Subject: [PATCH 08/10] Test both list and watch calls in watcher cache tests --- tests/k8s/test_watcher.py | 53 +++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index 54c066a..25319b1 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -130,15 +130,58 @@ def stop_iteration(*args, **kwargs): # verify watch_list was called with resourceVersion returned by list call api_watch_list.assert_called_with(namespace=None, resource_version=list_resource_version, allow_bookmarks=True) - def test_handle_reconnect(self, api_watch_list, api_list_with_meta): - # TODO: what does this test? - events = [_event(0, ADDED, 1)] - api_watch_list.side_effect = [events, events] + def test_handle_watcher_cache_watch(self, api_watch_list, api_list_with_meta): + # if the same event (same name, namespace and resource version) is returned by watch_list multiple times, it + # should only be yielded once. If a DELETED event is received with the same resourceVersion, it should be + # yielded. + # If a DELETED event is received with the same resourceVersion as a previous event, it should be yielded. + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=[]) + api_list_with_meta.return_value = model_list + + # yield event with resource twice, and stop the watcher after yielding the second event + event = _event(0, ADDED, 1) + delete_event = _event(0, DELETED, 1) + + def side_effect(*args, **kwargs): + yield event + yield event + yield delete_event + watcher._run_forever = False + + api_watch_list.side_effect = side_effect + watcher = Watcher(WatchListExample) gen = watcher.watch() _assert_event(next(gen), 0, ADDED, 1) - watcher._run_forever = False + _assert_event(next(gen), 0, DELETED, 1) + assert list(gen) == [] + + def test_handle_watcher_cache_list(self, api_watch_list, api_list_with_meta): + # if the same event (same name, namespace and resource version) is returned by list and watch_list multiple + # times, it should only be yielded once. + # If a DELETED event is received with the same resourceVersion as a previous event, it should be yielded. + resource = _example_resource(0, 1) + model_list = ModelList(metadata=ListMeta(resourceVersion="1"), items=[resource]) + api_list_with_meta.return_value = model_list + + # yield event twice, and stop the watcher after yielding the second event + event = WatchEvent(WatchEvent.ADDED, resource) + delete_event = WatchEvent(WatchEvent.DELETED, resource) + + def side_effect(*args, **kwargs): + yield event + yield event + yield delete_event + watcher._run_forever = False + + api_watch_list.side_effect = side_effect + + watcher = Watcher(WatchListExample) + gen = watcher.watch() + + _assert_event(next(gen), 0, ADDED, 1) + _assert_event(next(gen), 0, DELETED, 1) assert list(gen) == [] def test_handle_changes(self, api_watch_list, api_list_with_meta): From 35fd364e13965202757b24ae34146c9878a15950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Ingebrigtsen=20=C3=98vergaard?= Date: Mon, 29 Apr 2024 11:45:13 +0200 Subject: [PATCH 09/10] Keep Watch{Base,}Event.__init__ backward compatible Partially rework cba48a938b4380a56b7a1daf81fcdf73fbff9d19 to preserve the function signatures of WatchBaseEvent.__init__ and WatchEvent.__init__. Make all parameters keyword args. WatchEventBase takes either event_json or resource_version; WatchEventBase takes either event_json and Model class, or WatchEvent type and Model object instance, and raise ValueError for other combinations. --- k8s/base.py | 46 +++++++++++++++++++----------------- tests/k8s/test_base.py | 49 +++++++++++++++++++++++++++++++++++---- tests/k8s/test_client.py | 12 +++++----- tests/k8s/test_watcher.py | 6 ++--- 4 files changed, 78 insertions(+), 35 deletions(-) diff --git a/k8s/base.py b/k8s/base.py index acc078d..de6a995 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -21,7 +21,7 @@ import json import logging from collections import namedtuple -from typing import Optional, Dict, Type, List +from typing import Optional, Dict, List import requests import requests.packages.urllib3 as urllib3 @@ -212,7 +212,7 @@ def _parse_watch_event(cls, line) -> Optional[WatchBaseEvent]: event = WatchBookmark(event_json) else: LOG.debug("Received watch event from API server: %s", event_json) - event = WatchEvent.from_dict(event_json, cls) + event = WatchEvent(event_json, cls) return event except TypeError: LOG.exception( @@ -395,8 +395,11 @@ class WatchBaseEvent(ABC): __slots__ = ("resource_version",) - def __init__(self, resource_version): - self.resource_version = resource_version + def __init__(self, event_json=None, resource_version=None): + if event_json is not None: + self.resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") + else: + self.resource_version = resource_version def __eq__(self, other): return self.resource_version == other.resource_version @@ -410,19 +413,21 @@ class WatchEvent(WatchBaseEvent): MODIFIED = "MODIFIED" DELETED = "DELETED" - def __init__(self, _type: str, _object: Model): - # resource_version is effectively optional here to replicate the previous behavior - # in practice, watch events with None resourceVersion will break the caching in Watcher.watch() - resource_version = getattr(getattr(_object, "metadata", None), "resourceVersion", None) - super(WatchEvent, self).__init__(resource_version) - self.type = _type - self.object = _object - - @classmethod - def from_dict(cls, event_json: Dict, model_cls: Model) -> WatchEvent: - _type = event_json["type"] - _object = model_cls.from_dict(event_json["object"]) - return cls(_type, _object) + def __init__(self, event_json: dict = None, cls: type[Model] = None, _type: str = None, _object: Model = None): + if event_json is not None and cls is not None: + super(WatchEvent, self).__init__(event_json=event_json) + self.type = event_json["type"] + self.object = cls.from_dict(event_json["object"]) + elif _type is not None and _object is not None: + # resource_version is effectively optional here to match the behavior for event_json in WatchBaseEvent + # in practice, watch events with None resourceVersion will break the caching in Watcher.watch() + resource_version = getattr(getattr(_object, "metadata", None), "resourceVersion", None) + super(WatchEvent, self).__init__(resource_version=resource_version) + self.type = _type + self.object = _object + else: + raise ValueError("requires either event_json and cls or _type and _object, " + + f"got {event_json=}, {cls=}, {_type=}, {_object=}") def __repr__(self): return "{cls}(type={type}, object={object})".format( @@ -438,7 +443,7 @@ def has_object(self): class SyntheticAddedWatchEvent(WatchEvent): def __init__(self, obj: Model): - super(SyntheticAddedWatchEvent, self).__init__(WatchEvent.ADDED, obj) + super(SyntheticAddedWatchEvent, self).__init__(_type=WatchEvent.ADDED, _object=obj) class WatchBookmark(WatchBaseEvent): @@ -446,8 +451,7 @@ class WatchBookmark(WatchBaseEvent): They only contain the resourceVersion of the event.""" def __init__(self, event_json): - resource_version = event_json["object"].get("metadata", {}).get("resourceVersion") - super(WatchBookmark, self).__init__(resource_version) + super(WatchBookmark, self).__init__(event_json) @classmethod def match(cls, event_json): @@ -551,7 +555,7 @@ def __init__(self, metadata: ListMeta, items: List[Model]): self.items = items @classmethod - def from_dict(cls, model_cls: Type[Model], list_response_data: Dict): + def from_dict(cls, model_cls: type[Model], list_response_data: Dict): metadata = ListMeta.from_dict(list_response_data.get('metadata', {})) items = [model_cls.from_dict(item) for item in list_response_data.get('items', [])] return cls(metadata, items) diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index 75cf02b..e05a3ca 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -45,24 +45,63 @@ class TestWatchEvent(object): def test_watch_event_added(self): obj = _example_object(42, "1") event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} - watch_event = WatchEvent.from_dict(event_dict, Example) + watch_event = WatchEvent(event_dict, Example) assert watch_event.type == WatchEvent.ADDED assert watch_event.object == obj def test_watch_event_modified(self): obj = _example_object(42, "1") event_dict = {"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} - watch_event = WatchEvent.from_dict(event_dict, Example) + watch_event = WatchEvent(event_dict, Example) assert watch_event.type == WatchEvent.MODIFIED assert watch_event.object == obj def test_watch_event_deleted(self): obj = _example_object(42, "1") event_dict = {"type": "DELETED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}} - watch_event = WatchEvent.from_dict(event_dict, Example) + watch_event = WatchEvent(event_dict, Example) assert watch_event.type == WatchEvent.DELETED assert watch_event.object == obj + @pytest.mark.parametrize( + "_type", + ( + WatchEvent.ADDED, + WatchEvent.MODIFIED, + WatchEvent.DELETED, + ), + ) + def test_watch_event_type_object(self, _type): + obj = _example_object(42, "1") + watch_event = WatchEvent(_type=_type, _object=obj) + assert watch_event.type == _type + assert watch_event.object == obj + + @pytest.mark.parametrize( + "kwargs", + ( + # invalid combinations of keyword arguments + dict( + event_json={"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}}, + _type=WatchEvent.MODIFIED, + ), + dict( + event_json={"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}}, + _object=_example_object(42, "1"), + ), + dict(cls=Example, _type=WatchEvent.MODIFIED), + dict(cls=Example, _object=_example_object(42, "1")), + # passed only one keyword argument, but a correct pair of arguments is required + dict(event_json={"type": "MODIFIED", "object": {"metadata": {"resourceVersion": "1"}, "value": 42}}), + dict(_object=_example_object(42, "1")), + dict(_type=WatchEvent.MODIFIED), + dict(cls=Example), + ), + ) + def test_watch_event_invalid_params(self, kwargs): + with pytest.raises(ValueError): + WatchEvent(**kwargs) + class TestFind(object): @pytest.fixture @@ -141,7 +180,7 @@ def test_watch_list(self, client): ] gen = Example.watch_list() event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}} - assert next(gen) == WatchEvent.from_dict(event_dict, Example) + assert next(gen) == WatchEvent(event_dict, Example) client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) assert list(gen) == [] @@ -155,7 +194,7 @@ def test_watch_list_with_timeout(self, client): mock.seal(client) gen = Example.watch_list() event_dict = {"type": "ADDED", "object": {"metadata": {"resourceVersion": "1"}, "value": 1}} - assert next(gen) == WatchEvent.from_dict(event_dict, Example) + assert next(gen) == WatchEvent(event_dict, Example) assert list(gen) == [] assert client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2 client.get.assert_called_once_with("/watch/example", stream=True, timeout=270, params={}) diff --git a/tests/k8s/test_client.py b/tests/k8s/test_client.py index fae3163..307462e 100644 --- a/tests/k8s/test_client.py +++ b/tests/k8s/test_client.py @@ -223,8 +223,8 @@ def test_watch_list_payload_ok(self, get): get.return_value = response expected = [ - WatchEvent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), - WatchEvent(WatchEvent.MODIFIED, WatchListExample(value=3, requiredValue=4)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=1, requiredValue=2)), + WatchEvent(_type=WatchEvent.MODIFIED, _object=WatchListExample(value=3, requiredValue=4)), ] items = list(WatchListExample.watch_list()) @@ -257,9 +257,9 @@ def test_watch_list_payload_invalid_json(self, get): get.return_value = response expected = [ - WatchEvent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=1, requiredValue=2)), # "definitely not valid json" should be discarded - WatchEvent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=5, requiredValue=6)), ] items = list(WatchListExample.watch_list()) @@ -297,9 +297,9 @@ def test_watch_list_payload_invalid_object(self, get): get.return_value = response expected = [ - WatchEvent(WatchEvent.ADDED, WatchListExample(value=1, requiredValue=2)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=1, requiredValue=2)), # event with value=10 and requiredValue missing should be discarded - WatchEvent(WatchEvent.ADDED, WatchListExample(value=5, requiredValue=6)), + WatchEvent(_type=WatchEvent.ADDED, _object=WatchListExample(value=5, requiredValue=6)), ] items = list(WatchListExample.watch_list()) diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index 25319b1..dd50a20 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -49,7 +49,7 @@ def _example_resource(_id, rv, namespace="default"): def _event(_id, event_type, rv, namespace="default"): wle = _example_resource(_id, rv, namespace) - return WatchEvent(event_type, wle) + return WatchEvent(_type=event_type, _object=wle) def _assert_event(event, _id, event_type, rv, namespace="default"): @@ -166,8 +166,8 @@ def test_handle_watcher_cache_list(self, api_watch_list, api_list_with_meta): api_list_with_meta.return_value = model_list # yield event twice, and stop the watcher after yielding the second event - event = WatchEvent(WatchEvent.ADDED, resource) - delete_event = WatchEvent(WatchEvent.DELETED, resource) + event = WatchEvent(_type=WatchEvent.ADDED, _object=resource) + delete_event = WatchEvent(_type=WatchEvent.DELETED, _object=resource) def side_effect(*args, **kwargs): yield event From 107c9cdaf1b40f1b1a6e9cc78100951fa90863a1 Mon Sep 17 00:00:00 2001 From: Thomas Geirhovd Date: Wed, 22 May 2024 12:28:07 +0200 Subject: [PATCH 10/10] Fix failing assert in test_watcher --- tests/k8s/test_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index dd50a20..6fcd73c 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -297,7 +297,7 @@ def test_handle_410_watch(self, api_watch_list, api_list_with_meta): # synthetic added event for initial resource and added event _assert_event(next(gen), 0, ADDED, 0) _assert_event(next(gen), 1, ADDED, 2) - api_list_with_meta.called_once() + api_list_with_meta.assert_called_once() api_watch_list.assert_called_once_with( namespace=None, resource_version=first_list_resource_version, allow_bookmarks=True )