Skip to content

Commit

Permalink
feat: test images before creating sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius committed Sep 8, 2022
1 parent 378ff93 commit 950ad81
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 115 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,8 @@ acceptance-tests/cypress/videos
# Helm requirements lock files
helm-chart/amalthea/requirements.lock
helm-chart/amalthea/Chart.lock

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
7 changes: 6 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ RUN pip install --no-cache-dir --disable-pip-version-check -U pip && \
groupadd -g 1000 amalthea && \
useradd -u 1000 -g 1000 amalthea && \
apt-get update && \
apt-get install tini -y && \
apt-get install tini curl -y && \
rm -rf /var/lib/apt/lists/*

# Install all packages
WORKDIR /app
COPY Pipfile Pipfile.lock ./
RUN pipenv install --system --deploy
RUN curl -L https://github.com/NVIDIA/container-canary/releases/download/v0.2.1/canary_linux_amd64 > canary_linux_amd64 && \
curl -L https://github.com/NVIDIA/container-canary/releases/download/v0.2.1/canary_linux_amd64.sha256sum > canary_linux_amd64.sha256sum && \
sha256sum --check --status canary_linux_amd64.sha256sum && \
chmod +x canary_linux_amd64 && \
mv canary_linux_amd64 /usr/local/bin/canary

COPY controller /app/controller
COPY kopf_entrypoint.py ./
Expand Down
4 changes: 2 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ verify_ssl = true
name = "pypi"

[packages]
kopf = "*"
kopf = { version = "*", extras = ["dev"]}
pyyaml = "*"
kubernetes = "*"
jsonpatch = "*"
Expand All @@ -18,7 +18,7 @@ boto3 = "*"

[dev-packages]
black = "*"
flake8 = "*"
flake8 = ">=4.0.0,<5.0.0"
pytest = "*"
pylint = "*"
pytest-black = "*"
Expand Down
159 changes: 95 additions & 64 deletions Pipfile.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion controller/config_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
@dataclass
class S3Config:
"""The configuration needed to upload metrics to S3."""

endpoint: str
bucket: str
path_prefix: str
Expand All @@ -22,6 +23,7 @@ def __post_init__(self):
@dataclass
class MetricsBaseConfig:
"""Base metrics/auditlog configuration."""

enabled: Union[str, bool] = False
extra_labels: Union[str, List[str]] = field(default_factory=list)

Expand All @@ -35,12 +37,15 @@ def __post_init__(self):
@dataclass
class AuditlogConfig(MetricsBaseConfig):
"""The configuration used for the auditlogs."""

s3: Optional[S3Config] = None

def __post_init__(self):
super().__post_init__()
if self.enabled and not self.s3:
raise ValueError("If auditlog is enabled then the S3 configuration has to be provided.")
raise ValueError(
"If auditlog is enabled then the S3 configuration has to be provided."
)

@classmethod
def dataconf_from_env(cls, prefix="AUDITLOG_"):
Expand All @@ -50,6 +55,7 @@ def dataconf_from_env(cls, prefix="AUDITLOG_"):
@dataclass
class PrometheusMetricsConfig(MetricsBaseConfig):
"""The configuration for prometheus metrics"""

port: Union[str, int] = 8765

def __post_init__(self):
Expand Down
2 changes: 2 additions & 0 deletions controller/metrics/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class MetricEvent:
"""Every element in the metrics queue that is created by
amalthea and consumed by the metrics handlers conforms to this
structure."""

event_timestamp: datetime
session: Dict[str, Any]
sessionCreationTimestamp: Optional[datetime] = None
Expand Down Expand Up @@ -40,6 +41,7 @@ class MetricEventHandler(ABC):
"""Abstract class for the queue workers that will
be doing the final publishing or persisting of any metrics
that are generated by amalthea in the metrics queue."""

@abstractmethod
def publish(self, metric_event: MetricEvent):
pass
57 changes: 39 additions & 18 deletions controller/metrics/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

from controller.server_status_enum import ServerStatusEnum
from controller.metrics.events import MetricEventHandler, MetricEvent
from controller.metrics.utils import resource_request_from_manifest, additional_labels_from_manifest
from controller.metrics.utils import (
resource_request_from_manifest,
additional_labels_from_manifest,
)


class PrometheusMetricAction(Enum):
"""The different methods that can be used to manipulate prometheus metrics."""

inc = "inc"
dec = "dec"
set = "set"
Expand All @@ -20,23 +24,30 @@ class PrometheusMetricAction(Enum):
class PrometheusMetricType(NamedTuple):
"""A generic prometheus metric "struct" with its allowed
methods and the specific metric type."""

type: MetricWrapperBase
actions: List[PrometheusMetricAction]


class PrometheusMetricTypesEnum(Enum):
"""All the different prometheus metrics supported."""

counter = PrometheusMetricType(Counter, [PrometheusMetricAction.inc])
gauge = PrometheusMetricType(
Gauge,
[PrometheusMetricAction.inc, PrometheusMetricAction.dec, PrometheusMetricAction.set],
[
PrometheusMetricAction.inc,
PrometheusMetricAction.dec,
PrometheusMetricAction.set,
],
)
histogram = PrometheusMetricType(Histogram, [PrometheusMetricAction.observe])
summary = PrometheusMetricType(Summary, [PrometheusMetricAction.observe])


class PrometheusMetric():
class PrometheusMetric:
"""A generic wrapper class for all prometheus metrics."""

_label_name_invalid_first_letter = re.compile(r"^[^a-zA-Z_]")
_label_name_invalid_all_letters = re.compile(r"[^a-zA-Z0-9_]")

Expand All @@ -47,7 +58,7 @@ def __init__(
documentation: str,
labelnames: Optional[List[str]],
*args,
**kwargs
**kwargs,
):
if type(metric_type) is str:
self._metric_type = PrometheusMetricTypesEnum[metric_type].value
Expand All @@ -60,7 +71,9 @@ def __init__(
)
if labelnames is None:
labelnames = []
sanitized_label_names = [self.sanitize_label_name(label) for label in labelnames]
sanitized_label_names = [
self.sanitize_label_name(label) for label in labelnames
]
self._metric = self._metric_type.type(
name, documentation, sanitized_label_names, *args, **kwargs
)
Expand All @@ -73,9 +86,7 @@ def sanitize_label_name(self, val: str) -> str:
return val

def _sanitize_labels(self, labels: Dict[str, str]) -> Dict[str, str]:
return {
self.sanitize_label_name(name): val for name, val in labels.items()
}
return {self.sanitize_label_name(name): val for name, val in labels.items()}

def __call__(
self,
Expand Down Expand Up @@ -106,7 +117,9 @@ def __call__(
else:
sanitized_labels = {}
operation_method = getattr(
self._metric if len(sanitized_labels) == 0 else self._metric.labels(**sanitized_labels),
self._metric
if len(sanitized_labels) == 0
else self._metric.labels(**sanitized_labels),
action.value,
None,
)
Expand All @@ -116,6 +129,7 @@ def __call__(
class PrometheusMetricNames(Enum):
"""Used to avoid errors in metric names and to ensure
that always the same set of metric names are used."""

sessions_total_created = "sessions_total_created"
sessions_total_deleted = "sessions_total_deleted"
sessions_status_changes = "sessions_status_changes"
Expand All @@ -128,6 +142,7 @@ class PrometheusMetricNames(Enum):

class PrometheusMetricHandler(MetricEventHandler):
"""Handles metric events from the queue that are created by amalthea."""

def __init__(self, manifest_labelnames: List[str] = []):
self.manifest_labelnames = manifest_labelnames
self._sessions_total_created = PrometheusMetric(
Expand All @@ -150,46 +165,46 @@ def __init__(self, manifest_labelnames: List[str] = []):
*self.manifest_labelnames,
"status_from",
"status_to",
]
],
)
self._sessions_launch_duration = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_launch_duration_seconds"].value,
"How long did it take for a session to transition into running state",
self.manifest_labelnames,
unit="seconds",
buckets=[30, 60, 90, 120, 180, 240, 300, 480]
buckets=[30, 60, 90, 120, 180, 240, 300, 480],
)
self._sessions_cpu_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_cpu_request_millicores"].value,
"CPU millicores requested by a user for a session.",
self.manifest_labelnames,
unit="m",
buckets=[100, 500, 1000, 2000, 3000, 4000]
buckets=[100, 500, 1000, 2000, 3000, 4000],
)
self._sessions_memory_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_memory_request_bytes"].value,
"Memory requested by a user for a session.",
self.manifest_labelnames,
unit="byte",
buckets=[500e6, 1e9, 2e9, 4e9, 8e9, 16e9, 32e9]
buckets=[500e6, 1e9, 2e9, 4e9, 8e9, 16e9, 32e9],
)
self._sessions_gpu_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_gpu_request"].value,
"GPUs requested by a user for a session.",
self.manifest_labelnames,
buckets=[0, 1, 2, 3, 4]
buckets=[0, 1, 2, 3, 4],
)
self._sessions_disk_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_disk_request_bytes"].value,
"Disk space requested by a user for a session.",
self.manifest_labelnames,
unit="byte",
buckets=[1e9, 4e9, 16e9, 32e9, 64e9, 128e9]
buckets=[1e9, 4e9, 16e9, 32e9, 64e9, 128e9],
)

def _collect_labels_from_manifest(self, manifest: Dict[str, Any]) -> Dict[str, str]:
Expand Down Expand Up @@ -244,10 +259,16 @@ def _on_any_status_change(self, metric_event: MetricEvent):
if metric_event.status != metric_event.old_status:
status_change_labels = {
**manifest_labels,
"status_from": metric_event.old_status.value if metric_event.old_status else "None",
"status_to": metric_event.status.value if metric_event.status else "None",
"status_from": metric_event.old_status.value
if metric_event.old_status
else "None",
"status_to": metric_event.status.value
if metric_event.status
else "None",
}
self._sessions_status_changes(PrometheusMetricAction.inc, 1, status_change_labels)
self._sessions_status_changes(
PrometheusMetricAction.inc, 1, status_change_labels
)

def publish(self, metric_event: MetricEvent):
"""Publishes (i.e. persists) the proper prometheus metrics
Expand Down
1 change: 1 addition & 0 deletions controller/metrics/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class MetricsQueue:
metrics handlers subscribe to this queue and persist or further
publish the metrics to the proper place.
"""

def __init__(self, metric_handlers=List[MetricEventHandler]):
self.q = Queue()
self.metric_handlers = metric_handlers
Expand Down
25 changes: 15 additions & 10 deletions controller/metrics/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
@dataclass
class SesionMetricData:
"""The data that is included for each metric event uploaded to S3."""

name: str
namespace: str
uid: str
Expand All @@ -46,7 +47,9 @@ def _default_json_serializer(obj):
return obj.value

def __str__(self):
return json.dumps(asdict(self), default=self._default_json_serializer, indent=None)
return json.dumps(
asdict(self), default=self._default_json_serializer, indent=None
)

@classmethod
def from_metric_event(
Expand All @@ -65,7 +68,8 @@ def from_metric_event(
metric_event.status,
metric_event.old_status,
additional_labels_from_manifest(
metric_event.session, additional_label_names,
metric_event.session,
additional_label_names,
),
)

Expand All @@ -76,11 +80,10 @@ class S3RotatingLogHandler(BaseRotatingHandler):
not kept locally. The maximum rotation period (in seconds) can be
specified.
"""

_datetime_format = "_%Y%m%d_%H%M%S%z"

def __init__(
self, filename, mode, config: S3Config, encoding=None
):
def __init__(self, filename, mode, config: S3Config, encoding=None):
super().__init__(filename, mode, encoding, delay=False)
self._period_timedelta = timedelta(seconds=config.rotation_period_seconds)
self._start_timestamp = pytz.UTC.localize(datetime.utcnow())
Expand Down Expand Up @@ -110,9 +113,7 @@ def _upload(self, fname: str, remove_after_upload: bool = False):
resp = None
if file_stats.st_size > 0:
resp = self._client.upload_file(
fname,
self._bucket,
self._s3_path_prefix + "/" + Path(fname).name
fname, self._bucket, self._s3_path_prefix + "/" + Path(fname).name
)
if remove_after_upload:
os.remove(fname)
Expand All @@ -121,7 +122,9 @@ def _upload(self, fname: str, remove_after_upload: bool = False):
def _namer(self, default_name: str) -> str:
path = Path(default_name)
new_file = path.parent / (
path.stem + self._start_timestamp.strftime(self._datetime_format) + path.suffix
path.stem
+ self._start_timestamp.strftime(self._datetime_format)
+ path.suffix
)
return os.fspath(new_file)

Expand All @@ -147,10 +150,11 @@ def shouldRollover(self, _: str) -> bool:

class S3Formatter(Formatter):
"""Logging formatter that has ISO8601 timestamps and produces valid json logs."""

def __init__(self, validate: bool = True) -> None:
datefmt = "%Y-%m-%dT%H:%M:%S%z"
style = "%"
fmt = "{\"time\":\"%(asctime)s\", \"message\":%(message)s}"
fmt = '{"time":"%(asctime)s", "message":%(message)s}'
super().__init__(fmt, datefmt, style, validate)

def formatTime(self, record: LogRecord, datefmt: Optional[str] = None) -> str:
Expand All @@ -167,6 +171,7 @@ class S3MetricHandler(MetricEventHandler):
"""A simple metric handler that persists the metrics
that are published by Amalthea to a S3 bucket.
"""

def __init__(
self,
logger: Logger,
Expand Down
Loading

0 comments on commit 950ad81

Please sign in to comment.