Skip to content

Commit

Permalink
merge: Creating objects on RoleTagAccessor and/or non-list attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
Wuestengecko committed Nov 24, 2023
2 parents 0f26cb3 + 0f1a25b commit 1c25a6a
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 34 deletions.
2 changes: 1 addition & 1 deletion capellambse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .cli_helpers import *
from .filehandler import *
from .model import MelodyModel
from .model.common import ModelObject
from .model.common import ModelObject, new_object

_has_loaded_extensions = False

Expand Down
5 changes: 4 additions & 1 deletion capellambse/extensions/reqif/_capellareq.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ def insert(
self,
elmlist: c.ElementListCouplingMixin,
index: int,
value: c.ModelObject,
value: c.ModelObject | c.new_object,
) -> None:
if isinstance(value, c.new_object):
raise NotImplementedError("Cannot insert new objects yet")

if isinstance(value, CapellaOutgoingRelation):
parent = value.target._element
else:
Expand Down
1 change: 1 addition & 0 deletions capellambse/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
GenericElement,
ModelObject,
NonUniqueMemberError,
new_object,
)

LOGGER = logging.getLogger(__name__)
Expand Down
1 change: 1 addition & 0 deletions capellambse/model/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def register_xtype_handler(cls: type[T]) -> type[T]:


from .accessors import *
from .accessors import _NewObject as new_object
from .element import *
from .properties import *

Expand Down
208 changes: 182 additions & 26 deletions capellambse/model/common/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ def __str__(self) -> str:
)


class _NewObject:
"""A marker class that indicates that a new object should be created.
This object can be assigned to an attribute of a model object, and
will be replaced with a new object of the correct type by the
attribute's accessor.
In the future, this class will be replaced with a function that
simply returns a new object of the correct type.
"""

def __init__(self, /, *type_hint: str, **kw: t.Any) -> None:
self._type_hint = type_hint
self._kw = kw

def __repr__(self) -> str:
kw = ", ".join(f"{k}={v!r}" for k, v in self._kw.items())
return f"<new object {self._type_hint!r} ({kw})>"


class Accessor(t.Generic[T], metaclass=abc.ABCMeta):
"""Super class for all Accessor types."""

Expand Down Expand Up @@ -198,6 +218,13 @@ def __init__(
else:
self.aslist = None

def __set__(
self,
obj: element.ModelObject,
value: T | _NewObject | cabc.Iterable[T | _NewObject],
) -> None:
raise TypeError("Cannot set this type of attribute")

def create(
self,
elmlist: ElementListCouplingMixin,
Expand Down Expand Up @@ -237,7 +264,7 @@ def insert(
self,
elmlist: ElementListCouplingMixin,
index: int,
value: element.ModelObject,
value: element.ModelObject | _NewObject,
) -> None:
"""Insert the ``value`` object into the model.
Expand All @@ -255,6 +282,29 @@ def delete(
"""Delete the ``obj`` from the model."""
raise NotImplementedError("Objects in this list cannot be deleted")

def _create(
self,
parent: element.ModelObject,
xmltag: str | None,
/,
*type_hints: str | None,
**kw: t.Any,
) -> T:
if type_hints:
elmclass, kw["xtype"] = self._match_xtype(*type_hints)
else:
elmclass, kw["xtype"] = self._guess_xtype()
assert elmclass is not None

want_id: str | None = None
if "uuid" in kw:
want_id = kw.pop("uuid")

pelem = parent._element
with parent._model._loader.new_uuid(pelem, want=want_id) as obj_id:
obj = elmclass(parent._model, pelem, xmltag, uuid=obj_id, **kw)
return obj

def _make_list(self, parent_obj, elements):
assert hasattr(self, "class_")
assert hasattr(self, "list_extra_args")
Expand Down Expand Up @@ -325,6 +375,15 @@ def match_xt(xtp: S, itr: cabc.Iterable[S]) -> S:
objtype = match_xt(objtype, candidate_classes)
return candidate_classes[objtype], objtype

def _guess_xtype(self) -> tuple[type[T], str]:
try:
super_guess = super()._guess_xtype # type: ignore[misc]
except AttributeError:
pass
else:
return super_guess()
raise TypeError(f"{self._qualname} requires a type hint")

def purge_references(
self, obj: element.ModelObject, target: element.ModelObject
) -> contextlib.AbstractContextManager[None]:
Expand Down Expand Up @@ -580,7 +639,9 @@ def __get__(self, obj, objtype=None):
return rv

def __set__(
self, obj: element.ModelObject, value: str | T | cabc.Iterable[str | T]
self,
obj: element.ModelObject,
value: str | T | _NewObject | cabc.Iterable[str | T | _NewObject],
) -> None:
if getattr(obj, "_constructed", True):
sys.audit("capellambse.setattr", obj, self.__name__, value)
Expand All @@ -599,9 +660,15 @@ def __set__(
else:
if isinstance(value, cabc.Iterable) and not isinstance(value, str):
raise TypeError("Cannot set non-list attribute to an iterable")
raise NotImplementedError(
"Moving model objects is not supported yet"
)
if not isinstance(value, _NewObject):
raise NotImplementedError(
"Moving model objects is not supported yet"
)
if self.__get__(obj) is not None:
raise NotImplementedError(
"Replacing model objects is not supported yet"
)
self._create(obj, None, *value._type_hint, **value._kw)

def __delete__(self, obj: element.ModelObject) -> None:
if self.rootelem:
Expand Down Expand Up @@ -684,26 +751,20 @@ def create(
if self.rootelem:
raise TypeError("Cannot create objects here")

if type_hints:
elmclass, kw["xtype"] = self._match_xtype(*type_hints)
else:
elmclass, kw["xtype"] = self._guess_xtype()
assert elmclass is not None

parent = elmlist._parent._element
want_id: str | None = None
if "uuid" in kw:
want_id = kw.pop("uuid")
with elmlist._model._loader.new_uuid(parent, want=want_id) as obj_id:
obj = elmclass(elmlist._model, parent, uuid=obj_id, **kw)
return obj
return self._create(elmlist._parent, None, *type_hints, **kw)

def insert(
self,
elmlist: ElementListCouplingMixin,
index: int,
value: element.ModelObject,
value: element.ModelObject | _NewObject,
) -> None:
if isinstance(value, _NewObject):
raise NotImplementedError(
"Creating new objects in lists with new_object() is not"
" supported yet"
)

if value._model is not elmlist._model:
raise ValueError("Cannot move elements between models")
try:
Expand Down Expand Up @@ -909,12 +970,14 @@ def insert(
self,
elmlist: ElementListCouplingMixin,
index: int,
value: element.ModelObject,
value: element.ModelObject | _NewObject,
) -> None:
if self.aslist is None:
raise TypeError("Cannot insert: This is not a list (bug?)")
if self.tag is None:
raise NotImplementedError("Cannot insert: XML tag not set")
if isinstance(value, _NewObject):
raise NotImplementedError("Cannot insert new objects yet")

self.__create_link(
elmlist._parent,
Expand Down Expand Up @@ -1011,7 +1074,9 @@ def __get__(self, obj, objtype=None):
return rv

def __set__(
self, obj: element.ModelObject, values: T | cabc.Iterable[T]
self,
obj: element.ModelObject,
values: T | _NewObject | cabc.Iterable[T | _NewObject],
) -> None:
if getattr(obj, "_constructed", True):
sys.audit("capellambse.setattr", obj, self.__name__, values)
Expand All @@ -1023,8 +1088,11 @@ def __set__(
f"{self._qualname} requires a single item, not an iterable"
)

if any(isinstance(i, _NewObject) for i in values):
raise NotImplementedError("Cannot create new objects here")

assert isinstance(values, cabc.Iterable)
self.__set_links(obj, values)
self.__set_links(obj, values) # type: ignore[arg-type]

def __delete__(self, obj: element.ModelObject) -> None:
if getattr(obj, "_constructed", True):
Expand All @@ -1036,9 +1104,11 @@ def insert(
self,
elmlist: ElementListCouplingMixin,
index: int,
value: element.ModelObject,
value: element.ModelObject | _NewObject,
) -> None:
assert self.aslist is not None
if isinstance(value, _NewObject):
raise NotImplementedError("Cannot insert new objects yet")
if value._model is not elmlist._parent._model:
raise ValueError("Cannot insert elements from different models")
objs = [*elmlist[:index], value, *elmlist[index:]]
Expand Down Expand Up @@ -1551,7 +1621,9 @@ def __get__(
return rv

def __set__(
self, obj: element.ModelObject, value: cabc.Iterable[T]
self,
obj: element.ModelObject,
value: T | _NewObject | cabc.Iterable[T | _NewObject],
) -> None:
if obj._constructed:
sys.audit("capellambse.setattr", obj, self.__name__, value)
Expand All @@ -1562,6 +1634,10 @@ def __set__(
value = list(value)
else:
raise TypeError(f"Expected list, got {type(value).__name__}")

if any(isinstance(i, _NewObject) for i in value):
raise NotImplementedError("Cannot create new objects here")

if not all(isinstance(i, self.class_) for i in value):
raise TypeError(
f"Expected {self.class_.__name__}, got {type(value).__name__}"
Expand Down Expand Up @@ -1592,8 +1668,10 @@ def insert(
self,
elmlist: ElementListCouplingMixin,
index: int,
value: element.ModelObject,
value: element.ModelObject | _NewObject,
) -> None:
if isinstance(value, _NewObject):
raise NotImplementedError("Cannot insert new objects yet")
if not isinstance(value, self.class_):
raise TypeError(
f"Expected {self.class_.__name__}, got {type(value).__name__}"
Expand All @@ -1620,9 +1698,12 @@ def purge_references(
yield


class RoleTagAccessor(PhysicalAccessor):
class RoleTagAccessor(WritableAccessor, PhysicalAccessor):
__slots__ = ("role_tag",)

aslist: type[ElementListCouplingMixin] | None
class_: type[element.GenericElement]

def __init__(
self,
role_tag: str,
Expand Down Expand Up @@ -1650,6 +1731,81 @@ def __get__(self, obj, objtype=None):
sys.audit("capellambse.getattr", obj, self.__name__, rv)
return rv

def __set__(
self,
obj: element.ModelObject,
value: (
str
| element.GenericElement
| _NewObject
| cabc.Iterable[str | element.GenericElement | _NewObject]
),
) -> None:
if getattr(obj, "_constructed", True):
sys.audit("capellambse.setattr", obj, self.__name__, value)

if self.aslist:
raise NotImplementedError(
"Setting lists of model objects is not supported yet"
)
else:
if isinstance(value, cabc.Iterable) and not isinstance(value, str):
raise TypeError("Cannot set non-list attribute to an iterable")
if not isinstance(value, _NewObject):
raise NotImplementedError(
"Moving model objects is not supported yet"
)
if self.__get__(obj) is not None:
raise NotImplementedError(
"Replacing model objects is not supported yet"
)
self._create(obj, self.role_tag, *value._type_hint, **value._kw)

def create(
self,
elmlist: ElementListCouplingMixin,
/,
*type_hints: str | None,
**kw: t.Any,
) -> element.GenericElement:
return self._create(elmlist._parent, self.role_tag, *type_hints, **kw)

def insert(
self,
elmlist: ElementListCouplingMixin,
index: int,
value: element.ModelObject | _NewObject,
) -> None:
if isinstance(value, _NewObject):
raise NotImplementedError("Cannot insert new objects yet")
if value._model is not elmlist._model:
raise ValueError("Cannot move elements between models")
try:
indexof = elmlist._parent._element.index
if index > 0:
parent_index = indexof(elmlist._elements[index - 1]) + 1
elif index < -1:
parent_index = indexof(elmlist._elements[index + 1]) - 1
else:
parent_index = index
except ValueError:
parent_index = len(elmlist._parent._element)
elmlist._parent._element.insert(parent_index, value._element)
elmlist._model._loader.idcache_index(value._element)

def delete(
self,
elmlist: ElementListCouplingMixin,
obj: element.ModelObject,
) -> None:
raise NotImplementedError("NYI")

@contextlib.contextmanager
def purge_references(
self, obj: element.ModelObject, target: element.ModelObject
) -> cabc.Iterator[None]:
yield


def no_list(
desc: Accessor,
Expand Down
Loading

0 comments on commit 1c25a6a

Please sign in to comment.