Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
dev-uxmt: multiple minor fixes and improvements (#613)
Browse files Browse the repository at this point in the history
* allow any payload for topology

* refactor security policy api

* fix policy items

* bump v0.33.4dev2
  • Loading branch information
sbasan authored Apr 28, 2024
1 parent 04234f3 commit 884f4b5
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 83 deletions.
38 changes: 23 additions & 15 deletions catalystwan/api/config_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, overload
from uuid import UUID

from catalystwan.typed_list import DataSequence
Expand Down Expand Up @@ -32,8 +32,8 @@

class ConfigGroupAPI:
def __init__(self, session: ManagerSession):
self.session = session
self.endpoint = ConfigurationGroup(session)
self._session = session
self._endpoints = ConfigurationGroup(session)

def associate(self, cg_id: str, device_ids: list) -> None:
"""
Expand All @@ -46,7 +46,7 @@ def associate(self, cg_id: str, device_ids: list) -> None:

payload = ConfigGroupAssociatePayload(devices=devices)

self.endpoint.associate(config_group_id=cg_id, payload=payload)
self._endpoints.associate(config_group_id=cg_id, payload=payload)

def create(self, name: str, description: str, solution: Solution, profile_ids: list) -> ConfigGroupCreationResponse:
"""
Expand All @@ -60,7 +60,7 @@ def create(self, name: str, description: str, solution: Solution, profile_ids: l
name=name, description=description, solution=solution, profiles=profiles
)

return self.endpoint.create_config_group(cg_payload)
return self._endpoints.create_config_group(cg_payload)

def create_variables(
self, cg_id: str, device_ids: list, suggestions: bool = True
Expand All @@ -69,13 +69,13 @@ def create_variables(
Creates device specific variable data in given config-group
"""
payload = ConfigGroupVariablesCreatePayload(deviceIds=device_ids, suggestions=suggestions)
return self.endpoint.create_variables(config_group_id=cg_id, payload=payload)
return self._endpoints.create_variables(config_group_id=cg_id, payload=payload)

def delete(self, cg_id: str) -> None:
"""
Deletes existing config-group with given ID
"""
self.endpoint.delete_config_group(cg_id)
self._endpoints.delete_config_group(cg_id)

def deploy(self, cg_id: str, device_ids: list) -> ConfigGroupDeployResponse:
"""
Expand All @@ -86,7 +86,7 @@ def deploy(self, cg_id: str, device_ids: list) -> ConfigGroupDeployResponse:
devices.append(DeviceId(id=device_id))

payload = ConfigGroupDeployPayload(devices=devices)
return self.endpoint.deploy(config_group_id=cg_id, payload=payload)
return self._endpoints.deploy(config_group_id=cg_id, payload=payload)

def disassociate(self, cg_id: str, device_ids: list) -> ConfigGroupDisassociateResponse:
"""
Expand All @@ -97,7 +97,7 @@ def disassociate(self, cg_id: str, device_ids: list) -> ConfigGroupDisassociateR
devices.append(DeviceId(id=device_id))

payload = ConfigGroupAssociatePayload(devices=devices)
return self.endpoint.disassociate(config_group_id=cg_id, payload=payload)
return self._endpoints.disassociate(config_group_id=cg_id, payload=payload)

def edit(
self, cg_id: str, name: str, description: str, solution: Solution, profile_ids: list
Expand All @@ -111,21 +111,29 @@ def edit(
profiles.append(ProfileId(id=profile_id))
payload = ConfigGroupEditPayload(name=name, description=description, solution=solution, profiles=profiles)

return self.endpoint.edit_config_group(config_group_id=cg_id, payload=payload)
return self._endpoints.edit_config_group(config_group_id=cg_id, payload=payload)

def get(self, group_id: Optional[UUID] = None) -> Union[DataSequence[ConfigGroup], ConfigGroup, None]:
@overload
def get(self) -> DataSequence[ConfigGroup]:
...

@overload
def get(self, group_id: UUID) -> ConfigGroup:
...

def get(self, group_id: Optional[UUID] = None) -> Any:
"""
Gets list of existing config-groups or single config-group with given ID
If given ID is not correct return None
If given ID is not correct return None
"""
if group_id is None:
return self.endpoint.get()
return self.endpoint.get().filter(id=group_id).single_or_default()
return self._endpoints.get()
return self._endpoints.get().filter(id=group_id).single_or_default()

def update_variables(self, cg_id: str, solution: Solution, device_variables: list) -> None:
"""
Updates device specific variable data in given config-group
"""
payload = ConfigGroupVariablesEditPayload(solution=solution, devices=device_variables)

self.endpoint.update_variables(config_group_id=cg_id, payload=payload)
self._endpoints.update_variables(config_group_id=cg_id, payload=payload)
6 changes: 3 additions & 3 deletions catalystwan/api/policy_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
from catalystwan.models.policy.policy_list import PolicyListBase
from catalystwan.models.policy.security import (
AnySecurityPolicy,
AnySecurityPolicyInfo,
AnySecurityPolicyInfoList,
SecurityPolicy,
SecurityPolicyEditResponse,
UnifiedSecurityPolicy,
Expand Down Expand Up @@ -328,7 +328,7 @@ def delete(self, id: UUID) -> None:
self._endpoints.delete_security_template(id)

@overload
def get(self) -> List[AnySecurityPolicyInfo]:
def get(self) -> AnySecurityPolicyInfoList:
...

@overload
Expand All @@ -338,7 +338,7 @@ def get(self, id: UUID) -> AnySecurityPolicy:
def get(self, id: Optional[UUID] = None) -> Any:
if id is not None:
return self._endpoints.get_security_template(id).root
return [info.root for info in self._endpoints.generate_security_template_list()]
return self._endpoints.generate_security_template_list()


class PolicyListsAPI:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
from catalystwan.endpoints import JSON, APIEndpoints, delete, get, post, put
from catalystwan.models.policy.security import (
AnySecurityPolicy,
AnySecurityPolicyInfoList,
SecurityPolicyEditResponse,
SecurityPolicyInfoRoot,
SecurityPolicyRoot,
)
from catalystwan.typed_list import DataSequence


class ConfigurationSecurityTemplatePolicy(APIEndpoints):
Expand All @@ -36,7 +35,7 @@ def generate_security_policy_summary(self):
...

@get("/template/policy/security", "data")
def generate_security_template_list(self) -> DataSequence[SecurityPolicyInfoRoot]:
def generate_security_template_list(self) -> AnySecurityPolicyInfoList:
...

def get_device_list_by_id(self):
Expand Down
4 changes: 2 additions & 2 deletions catalystwan/endpoints/configuration_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# mypy: disable-error-code="empty-body"
from datetime import datetime
from typing import List, Optional
from typing import Any, List, Optional
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -58,7 +58,7 @@ class ConfigGroup(BaseModel):
serialization_alias="numberOfDevicesUpToDate", validation_alias="numberOfDevicesUpToDate"
)
origin: Optional[str] = None
topology: Optional[str] = None
topology: Any = None
full_config_cli: Optional[bool] = Field(
default=None, serialization_alias="fullConfigCli", validation_alias="fullConfigCli"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class AsPathEntry(BaseModel):
class AsPathParcel(_ParcelBase):
type_: Literal["as-path"] = Field(default="as-path", exclude=True)
as_path_list_num: Global[int] = Field(validation_alias=AliasPath("data", "asPathListNum"))
entries: List[AsPathEntry] = Field(validation_alias=AliasPath("data", "entries"))
entries: List[AsPathEntry] = Field(default=[], validation_alias=AliasPath("data", "entries"))

def add_as_path(self, as_path: str):
self.entries.append(AsPathEntry(as_path=as_global(as_path)))
2 changes: 1 addition & 1 deletion catalystwan/models/policy/definition/amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class AdvancedMalwareProtectionDefinition(BaseModel):

class AdvancedMalwareProtectionPolicy(PolicyDefinitionBase):
type: Literal["advancedMalwareProtection"] = "advancedMalwareProtection"
mode: AMPPolicyType
mode: AMPPolicyType = "security"
definition: AdvancedMalwareProtectionDefinition


Expand Down
4 changes: 4 additions & 0 deletions catalystwan/models/policy/definition/zone_based_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
DestinationIPEntry,
DestinationPortEntry,
DestinationPortListEntry,
DestinationScalableGroupTagListEntry,
LogAction,
Match,
PolicyActionType,
Expand All @@ -40,6 +41,7 @@
SourceIPEntry,
SourcePortEntry,
SourcePortListEntry,
SourceScalableGroupTagListEntry,
)

ZoneBasedFWPolicySequenceEntry = Annotated[
Expand All @@ -53,6 +55,7 @@
DestinationIPEntry,
DestinationPortEntry,
DestinationPortListEntry,
DestinationScalableGroupTagListEntry,
ProtocolEntry,
ProtocolNameEntry,
ProtocolNameListEntry,
Expand All @@ -65,6 +68,7 @@
SourceIPEntry,
SourcePortEntry,
SourcePortListEntry,
SourceScalableGroupTagListEntry,
],
Field(discriminator="field"),
]
Expand Down
12 changes: 12 additions & 0 deletions catalystwan/models/policy/policy_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,11 +567,21 @@ class SourcePortListEntry(BaseModel):
ref: UUID


class SourceScalableGroupTagListEntry(BaseModel):
field: Literal["sourceScalableGroupTagList"] = "sourceScalableGroupTagList"
ref: UUID


class DestinationPortListEntry(BaseModel):
field: Literal["destinationPortList"] = "destinationPortList"
ref: UUID


class DestinationScalableGroupTagListEntry(BaseModel):
field: Literal["destinationScalableGroupTagList"] = "destinationScalableGroupTagList"
ref: UUID


class RuleSetListEntry(BaseModel):
field: Literal["ruleSetList"] = "ruleSetList"
ref: str
Expand Down Expand Up @@ -858,6 +868,7 @@ class ActionSet(BaseModel):
DestinationPortEntry,
DestinationPortListEntry,
DestinationRegionEntry,
DestinationScalableGroupTagListEntry,
DNSAppListEntry,
DNSEntry,
DomainIDEntry,
Expand Down Expand Up @@ -894,6 +905,7 @@ class ActionSet(BaseModel):
SourceIPv6Entry,
SourcePortEntry,
SourcePortListEntry,
SourceScalableGroupTagListEntry,
TCPEntry,
TLOCEntry,
TLOCListEntry,
Expand Down
19 changes: 16 additions & 3 deletions catalystwan/models/policy/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
URLFilteringAssemblyItem,
ZoneBasedFWAssemblyItem,
)
from catalystwan.typed_list import DataSequence

SecurityPolicyAssemblyItem = Annotated[
Union[
Expand Down Expand Up @@ -121,7 +122,7 @@ class UnifiedSecurityPolicyDefinition(PolicyDefinition):


class SecurityPolicy(PolicyCreationPayload):
policy_mode: Union[Literal["security"], None] = Field(
policy_mode: Literal[None, "security"] = Field(
default="security", serialization_alias="policyMode", validation_alias="policyMode"
)
policy_type: str = Field(default="feature", serialization_alias="policyType", validation_alias="policyType")
Expand Down Expand Up @@ -223,5 +224,17 @@ class UnifiedSecurityPolicyInfo(UnifiedSecurityPolicy, PolicyInfo):
AnySecurityPolicyInfo = Union[SecurityPolicyInfo, UnifiedSecurityPolicyInfo]


class SecurityPolicyInfoRoot(RootModel):
root: AnySecurityPolicyInfo
class AnySecurityPolicyInfoList(RootModel):
root: List[AnySecurityPolicyInfo]

@property
def all(self) -> List[AnySecurityPolicyInfo]:
return self.root

@property
def security(self) -> DataSequence[SecurityPolicyInfo]:
return DataSequence(SecurityPolicyInfo, [p for p in self.root if p.policy_mode == "security"])

@property
def unified(self) -> DataSequence[UnifiedSecurityPolicyInfo]:
return DataSequence(UnifiedSecurityPolicyInfo, [p for p in self.root if p.policy_mode == "unified"])
Loading

0 comments on commit 884f4b5

Please sign in to comment.