Skip to content

Commit

Permalink
- missing typings added
Browse files Browse the repository at this point in the history
- `httpurl_to_str` utils function removed
- `CustomBaseModel` created and used for all Pydantic model classes
- `RandomAuditLogSettings` model renamed to `BulkAuditLogOptions`
  • Loading branch information
bulletinmybeard committed Mar 30, 2024
1 parent 31293ff commit e20b4bb
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 57 deletions.
2 changes: 1 addition & 1 deletion audit_logger/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def validation_exception_handler(


class BulkLimitExceededError(HTTPException):
def __init__(self, limit: int):
def __init__(self, limit: int) -> None:
super().__init__(
status_code=400,
detail=f"The maximum number of items allowed in a bulk operation is {limit}.",
Expand Down
8 changes: 4 additions & 4 deletions audit_logger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from audit_logger.middlewares import add_middleware
from audit_logger.models import (
AuditLogEntry,
BulkAuditLogOptions,
GenericResponse,
RandomAuditLogSettings,
SearchParamsV2,
SearchResults,
)
Expand Down Expand Up @@ -109,7 +109,7 @@ async def create_bulk_audit_log_entries(
Raises:
Union[HTTPException, BulkLimitExceededError]
"""
bulk_limit = 250
bulk_limit = 350
if len(audit_logs) > bulk_limit:
raise BulkLimitExceededError(limit=bulk_limit)
return await process_audit_logs(
Expand All @@ -121,7 +121,7 @@ async def create_bulk_audit_log_entries(

@app.post("/create/create-bulk-auto")
async def create_fake_audit_log_entries(
settings: RandomAuditLogSettings,
options: BulkAuditLogOptions,
) -> GenericResponse:
"""
Generates and stores a single random audit log entry.
Expand All @@ -135,7 +135,7 @@ async def create_fake_audit_log_entries(
return await process_audit_logs(
elastic,
cast(str, env_vars.elastic_index_name),
generate_audit_log_entries_with_fake_data(settings),
generate_audit_log_entries_with_fake_data(options),
)


Expand Down
2 changes: 1 addition & 1 deletion audit_logger/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .actor_details import ActorDetails
from .audit_log_entry import AuditLogEntry
from .config import APIMiddlewares, AppConfig, CORSSettings
from .request import RandomAuditLogSettings
from .request import BulkAuditLogOptions
from .resource import ResourceDetails
from .response_models import (
ActorDetails,
Expand Down
6 changes: 4 additions & 2 deletions audit_logger/models/actor_details.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel, Field, IPvAnyAddress
from pydantic import Field, IPvAnyAddress

from audit_logger.models.custom_base import CustomBaseModel


class ActorType(str, Enum):
USER = "user"
SYSTEM = "system"


class ActorDetails(BaseModel):
class ActorDetails(CustomBaseModel):
identifier: Optional[str] = Field(
default=None,
description="Unique identifier of the actor. "
Expand Down
9 changes: 3 additions & 6 deletions audit_logger/models/audit_log_entry.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Any, Dict, Optional

from pydantic import BaseModel, Extra, Field
from pydantic import Field

from audit_logger.models.actor_details import ActorDetails
from audit_logger.models.custom_base import CustomBaseModel
from audit_logger.models.resource import ResourceDetails
from audit_logger.models.server_details import ServerDetails


class AuditLogEntry(BaseModel):
class AuditLogEntry(CustomBaseModel):
timestamp: Optional[str] = Field(
default=None,
description="The date and time when the event occurred, in ISO 8601 format.",
Expand Down Expand Up @@ -44,7 +45,3 @@ class AuditLogEntry(BaseModel):
meta: Dict[str, Any] = Field(
default={}, description="Optional metadata about the event."
)

# Forbid extra fields and raise an exception if any are found.
class Config:
extra = Extra.forbid
10 changes: 6 additions & 4 deletions audit_logger/models/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import List, Optional

from pydantic import BaseModel, Field
from pydantic import Field

from audit_logger.models.custom_base import CustomBaseModel

class CORSSettings(BaseModel):

class CORSSettings(CustomBaseModel):
allow_origins: Optional[List[str]] = Field(
default=["*"],
description="List of allowed origin URLs. Use '*' to allow all origins.",
Expand All @@ -22,11 +24,11 @@ class CORSSettings(BaseModel):
)


class APIMiddlewares(BaseModel):
class APIMiddlewares(CustomBaseModel):
cors: CORSSettings = Field(description="CORS middleware settings")


class AppConfig(BaseModel):
class AppConfig(CustomBaseModel):
middlewares: Optional[APIMiddlewares] = Field(
description="API Middlewares settings",
)
12 changes: 12 additions & 0 deletions audit_logger/models/custom_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any

from pydantic import BaseModel, Extra


class CustomBaseModel(BaseModel):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)

# Forbid extra fields and raise an exception if any are found.
class Config:
extra = Extra.forbid
6 changes: 4 additions & 2 deletions audit_logger/models/env_vars.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Optional

from pydantic import BaseModel, Field, HttpUrl, field_validator
from pydantic import Field, HttpUrl, field_validator

from audit_logger.models.custom_base import CustomBaseModel

class EnvVars(BaseModel):

class EnvVars(CustomBaseModel):
elastic_index_name: str = Field(...)
elastic_url: str = Field(...)
elastic_username: Optional[str] = Field(None)
Expand Down
11 changes: 5 additions & 6 deletions audit_logger/models/request.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from typing import Optional

from pydantic import BaseModel, Extra, Field
from pydantic import Field

from audit_logger.models.custom_base import CustomBaseModel

class RandomAuditLogSettings(BaseModel):

class BulkAuditLogOptions(CustomBaseModel):
bulk_count: Optional[int] = Field(
1,
ge=1,
le=350,
description="Specifies the number of fake audit log entries to generate.",
alias="bulk_limit",
)

# Forbid extra fields and raise an exception if any are found.
class Config:
extra = Extra.forbid
6 changes: 4 additions & 2 deletions audit_logger/models/resource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Optional

from pydantic import BaseModel, Field
from pydantic import Field

from audit_logger.models.custom_base import CustomBaseModel

class ResourceDetails(BaseModel):

class ResourceDetails(CustomBaseModel):
type: Optional[str] = Field(
default=None,
description="Type of the resource that was acted upon.",
Expand Down
9 changes: 5 additions & 4 deletions audit_logger/models/response_models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field
from pydantic import Field

from audit_logger.models.actor_details import ActorDetails
from audit_logger.models.custom_base import CustomBaseModel
from audit_logger.models.resource import ResourceDetails


class LogEntryDetails(BaseModel):
class LogEntryDetails(CustomBaseModel):
timestamp: Optional[str] = Field(
default=None, description="The timestamp of the log entry."
)
Expand Down Expand Up @@ -44,7 +45,7 @@ class LogEntryDetails(BaseModel):
)


class SearchResults(BaseModel):
class SearchResults(CustomBaseModel):
hits: int = Field(default=0, description="The number of hits for the search query.")
docs: List[Any] = Field(
default=[], description="A list of documents that match the search query."
Expand All @@ -54,7 +55,7 @@ class SearchResults(BaseModel):
)


class GenericResponse(BaseModel):
class GenericResponse(CustomBaseModel):
status: str = Field(default=None, description="The status of the response.")
success_count: int = Field(
default=None, description="The number of successful items in the response."
Expand Down
19 changes: 8 additions & 11 deletions audit_logger/models/search_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, Extra, Field, field_validator, model_validator
from pydantic import Field, field_validator, model_validator

from audit_logger.models.custom_base import CustomBaseModel
from audit_logger.utils import is_valid_ip_v4_address, validate_date

logger = logging.getLogger("audit_logger")


class MaxResultsMixin(BaseModel):
class MaxResultsMixin(CustomBaseModel):
max_results: Optional[int] = Field(
500,
ge=1,
Expand Down Expand Up @@ -70,7 +71,7 @@ class AggregationTypeEnum(str, Enum):
DATE_HISTOGRAM = "date_histogram"


class SearchFilterParams(BaseModel):
class SearchFilterParams(CustomBaseModel):
field: FieldIdentifierEnum = Field(
default=None,
description="",
Expand Down Expand Up @@ -135,16 +136,16 @@ def check_type_is_valid(cls, v):
return v


class AggregationFilterParams(BaseModel):
class AggregationFilterParams(CustomBaseModel):
range: Optional[Dict[str, Dict[str, str]]]


class SubAggregationConfig(BaseModel):
class SubAggregationConfig(CustomBaseModel):
type: AggregationTypeEnum
field: FieldIdentifierEnum


class AggregationSetup(MaxResultsMixin, BaseModel):
class AggregationSetup(MaxResultsMixin, CustomBaseModel):
type: AggregationTypeEnum = Field(
..., description="Specifies the type of aggregation."
)
Expand All @@ -161,7 +162,7 @@ class AggregationSetup(MaxResultsMixin, BaseModel):
)


class SearchParamsV2(MaxResultsMixin, BaseModel):
class SearchParamsV2(MaxResultsMixin, CustomBaseModel):
fields: Optional[List[FieldIdentifierEnum]] = Field(
default=None,
description="Fields to include in results. Empty includes all fields.",
Expand All @@ -186,10 +187,6 @@ class SearchParamsV2(MaxResultsMixin, BaseModel):
description="Aggregations to apply for data summarization/analysis.",
)

# Forbid extra fields and raise an exception if any are found.
class Config:
extra = Extra.forbid

@field_validator("sort_order")
def sort_order_valid(cls, v: Optional[SortOrderEnum]) -> Optional[SortOrderEnum]:
if v and v not in SortOrderEnum:
Expand Down
6 changes: 4 additions & 2 deletions audit_logger/models/server_details.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Optional

from pydantic import BaseModel, Field, IPvAnyAddress
from pydantic import Field, IPvAnyAddress

from audit_logger.models.custom_base import CustomBaseModel

class ServerDetails(BaseModel):

class ServerDetails(CustomBaseModel):
hostname: Optional[str] = Field(
default=None,
description="Hostname of the server where the event occurred.",
Expand Down
22 changes: 10 additions & 12 deletions audit_logger/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from elasticsearch import Elasticsearch, SerializationError, helpers
from faker import Faker
from fastapi import HTTPException
from pydantic import HttpUrl, ValidationError
from pydantic import ValidationError

from audit_logger.custom_logger import get_logger
from audit_logger.models import (
ActorDetails,
AuditLogEntry,
BulkAuditLogOptions,
GenericResponse,
RandomAuditLogSettings,
ResourceDetails,
)
from audit_logger.models.env_vars import EnvVars
Expand Down Expand Up @@ -98,7 +98,9 @@ def is_valid_ip_v4_address(ip_address: str) -> bool:
return False


def create_bulk_operations(index_name: str, log_entries: List[Dict]) -> List[Dict]:
def create_bulk_operations(
index_name: str, log_entries: List[AuditLogEntry]
) -> List[Dict]:
"""
This bulk helper function prepares a list of operations for the Elasticsearch bulk API
based on the provided log entries.
Expand Down Expand Up @@ -129,28 +131,28 @@ def create_bulk_operations(index_name: str, log_entries: List[Dict]) -> List[Dic


def generate_audit_log_entries_with_fake_data(
settings: RandomAuditLogSettings,
settings: BulkAuditLogOptions,
) -> List[Dict]:
"""
Generates a random audit log entry using the Faker library.
Returns:
- List[Dict]: A list of audit log entries with fake data.
"""
return [generate_fake_log_entry().dict() for _ in range(settings.bulk_count)]
return [generate_log_entry().dict() for _ in range(settings.bulk_count)]


# ) -> GenericResponse:
async def process_audit_logs(
elastic: Elasticsearch, elastic_index_name: str, log_entries: List[Dict]
elastic: Elasticsearch, elastic_index_name: str, log_entries: List[AuditLogEntry]
) -> Any:
"""
Processes a list of audit log entries by sending them to Elasticsearch using the bulk API.
Args:
- elastic: An instance of the Elasticsearch client.
- elastic_index_name (str): The name of the Elasticsearch index.
- log_entries (List[Dict]): A list of audit log entries to be processed.
- log_entries (List[AuditLogEntry]): A list of audit log entries to be processed.
Returns:
- GenericResponse
Expand Down Expand Up @@ -207,7 +209,7 @@ def validate_date(date_str: str) -> bool:
return False


def generate_fake_log_entry() -> AuditLogEntry:
def generate_log_entry() -> AuditLogEntry:
"""Create a fake audit log entry using the Faker library."""
return AuditLogEntry(
timestamp=fake.date_time_this_year().isoformat(),
Expand Down Expand Up @@ -246,10 +248,6 @@ def generate_fake_log_entry() -> AuditLogEntry:
)


def httpurl_to_str(url: HttpUrl) -> str:
return str(url)


def load_env_vars() -> EnvVars:
"""
Load all environment variables and validate them against the EnvVars Pydantic model.
Expand Down

0 comments on commit e20b4bb

Please sign in to comment.