Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: test images before creating sessions #239

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,8 @@ acceptance-tests/cypress/videos
# Helm requirements lock files
helm-chart/amalthea/requirements.lock
helm-chart/amalthea/Chart.lock

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
14 changes: 13 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ RUN pip install --no-cache-dir --disable-pip-version-check -U pip && \
groupadd -g 1000 amalthea && \
useradd -u 1000 -g 1000 amalthea && \
apt-get update && \
apt-get install tini -y && \
apt-get install tini curl -y && \
rm -rf /var/lib/apt/lists/*

# Install all packages
WORKDIR /app
COPY Pipfile Pipfile.lock ./
RUN pipenv install --system --deploy
RUN curl -L https://github.com/NVIDIA/container-canary/releases/download/v0.2.1/canary_linux_amd64 > canary_linux_amd64 && \
curl -L https://github.com/NVIDIA/container-canary/releases/download/v0.2.1/canary_linux_amd64.sha256sum > canary_linux_amd64.sha256sum && \
sha256sum --check --status canary_linux_amd64.sha256sum && \
chmod +x canary_linux_amd64 && \
mv canary_linux_amd64 /usr/local/bin/canary
# fix pyngrok permissions
RUN mkdir /usr/local/lib/python3.9/site-packages/pyngrok/bin && \
chown :1000 /usr/local/lib/python3.9/site-packages/pyngrok/bin && \
chmod 770 /usr/local/lib/python3.9/site-packages/pyngrok/bin && \
mkdir /home/amalthea/ && \
chown :1000 /home/amalthea && \
chmod 770 /home/amalthea

COPY controller /app/controller
COPY kopf_entrypoint.py ./
Expand Down
4 changes: 2 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ verify_ssl = true
name = "pypi"

[packages]
kopf = "*"
kopf = { version = "*", extras = ["dev"]}
pyyaml = "*"
kubernetes = "*"
jsonpatch = "*"
Expand All @@ -18,7 +18,7 @@ boto3 = "*"

[dev-packages]
black = "*"
flake8 = "*"
flake8 = ">=4.0.0,<5.0.0"
pytest = "*"
pylint = "*"
pytest-black = "*"
Expand Down
159 changes: 95 additions & 64 deletions Pipfile.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion controller/config_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
@dataclass
class S3Config:
"""The configuration needed to upload metrics to S3."""

endpoint: str
bucket: str
path_prefix: str
Expand All @@ -22,6 +23,7 @@ def __post_init__(self):
@dataclass
class MetricsBaseConfig:
"""Base metrics/auditlog configuration."""

enabled: Union[str, bool] = False
extra_labels: Union[str, List[str]] = field(default_factory=list)

Expand All @@ -35,12 +37,15 @@ def __post_init__(self):
@dataclass
class AuditlogConfig(MetricsBaseConfig):
"""The configuration used for the auditlogs."""

s3: Optional[S3Config] = None

def __post_init__(self):
super().__post_init__()
if self.enabled and not self.s3:
raise ValueError("If auditlog is enabled then the S3 configuration has to be provided.")
raise ValueError(
"If auditlog is enabled then the S3 configuration has to be provided."
)

@classmethod
def dataconf_from_env(cls, prefix="AUDITLOG_"):
Expand All @@ -50,6 +55,7 @@ def dataconf_from_env(cls, prefix="AUDITLOG_"):
@dataclass
class PrometheusMetricsConfig(MetricsBaseConfig):
"""The configuration for prometheus metrics"""

port: Union[str, int] = 8765

def __post_init__(self):
Expand Down
2 changes: 2 additions & 0 deletions controller/metrics/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class MetricEvent:
"""Every element in the metrics queue that is created by
amalthea and consumed by the metrics handlers conforms to this
structure."""

event_timestamp: datetime
session: Dict[str, Any]
sessionCreationTimestamp: Optional[datetime] = None
Expand Down Expand Up @@ -40,6 +41,7 @@ class MetricEventHandler(ABC):
"""Abstract class for the queue workers that will
be doing the final publishing or persisting of any metrics
that are generated by amalthea in the metrics queue."""

@abstractmethod
def publish(self, metric_event: MetricEvent):
pass
57 changes: 39 additions & 18 deletions controller/metrics/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

from controller.server_status_enum import ServerStatusEnum
from controller.metrics.events import MetricEventHandler, MetricEvent
from controller.metrics.utils import resource_request_from_manifest, additional_labels_from_manifest
from controller.metrics.utils import (
resource_request_from_manifest,
additional_labels_from_manifest,
)


class PrometheusMetricAction(Enum):
"""The different methods that can be used to manipulate prometheus metrics."""

inc = "inc"
dec = "dec"
set = "set"
Expand All @@ -20,23 +24,30 @@ class PrometheusMetricAction(Enum):
class PrometheusMetricType(NamedTuple):
"""A generic prometheus metric "struct" with its allowed
methods and the specific metric type."""

type: MetricWrapperBase
actions: List[PrometheusMetricAction]


class PrometheusMetricTypesEnum(Enum):
"""All the different prometheus metrics supported."""

counter = PrometheusMetricType(Counter, [PrometheusMetricAction.inc])
gauge = PrometheusMetricType(
Gauge,
[PrometheusMetricAction.inc, PrometheusMetricAction.dec, PrometheusMetricAction.set],
[
PrometheusMetricAction.inc,
PrometheusMetricAction.dec,
PrometheusMetricAction.set,
],
)
histogram = PrometheusMetricType(Histogram, [PrometheusMetricAction.observe])
summary = PrometheusMetricType(Summary, [PrometheusMetricAction.observe])


class PrometheusMetric():
class PrometheusMetric:
"""A generic wrapper class for all prometheus metrics."""

_label_name_invalid_first_letter = re.compile(r"^[^a-zA-Z_]")
_label_name_invalid_all_letters = re.compile(r"[^a-zA-Z0-9_]")

Expand All @@ -47,7 +58,7 @@ def __init__(
documentation: str,
labelnames: Optional[List[str]],
*args,
**kwargs
**kwargs,
):
if type(metric_type) is str:
self._metric_type = PrometheusMetricTypesEnum[metric_type].value
Expand All @@ -60,7 +71,9 @@ def __init__(
)
if labelnames is None:
labelnames = []
sanitized_label_names = [self.sanitize_label_name(label) for label in labelnames]
sanitized_label_names = [
self.sanitize_label_name(label) for label in labelnames
]
self._metric = self._metric_type.type(
name, documentation, sanitized_label_names, *args, **kwargs
)
Expand All @@ -73,9 +86,7 @@ def sanitize_label_name(self, val: str) -> str:
return val

def _sanitize_labels(self, labels: Dict[str, str]) -> Dict[str, str]:
return {
self.sanitize_label_name(name): val for name, val in labels.items()
}
return {self.sanitize_label_name(name): val for name, val in labels.items()}

def __call__(
self,
Expand Down Expand Up @@ -106,7 +117,9 @@ def __call__(
else:
sanitized_labels = {}
operation_method = getattr(
self._metric if len(sanitized_labels) == 0 else self._metric.labels(**sanitized_labels),
self._metric
if len(sanitized_labels) == 0
else self._metric.labels(**sanitized_labels),
action.value,
None,
)
Expand All @@ -116,6 +129,7 @@ def __call__(
class PrometheusMetricNames(Enum):
"""Used to avoid errors in metric names and to ensure
that always the same set of metric names are used."""

sessions_total_created = "sessions_total_created"
sessions_total_deleted = "sessions_total_deleted"
sessions_status_changes = "sessions_status_changes"
Expand All @@ -128,6 +142,7 @@ class PrometheusMetricNames(Enum):

class PrometheusMetricHandler(MetricEventHandler):
"""Handles metric events from the queue that are created by amalthea."""

def __init__(self, manifest_labelnames: List[str] = []):
self.manifest_labelnames = manifest_labelnames
self._sessions_total_created = PrometheusMetric(
Expand All @@ -150,46 +165,46 @@ def __init__(self, manifest_labelnames: List[str] = []):
*self.manifest_labelnames,
"status_from",
"status_to",
]
],
)
self._sessions_launch_duration = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_launch_duration_seconds"].value,
"How long did it take for a session to transition into running state",
self.manifest_labelnames,
unit="seconds",
buckets=[30, 60, 90, 120, 180, 240, 300, 480]
buckets=[30, 60, 90, 120, 180, 240, 300, 480],
)
self._sessions_cpu_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_cpu_request_millicores"].value,
"CPU millicores requested by a user for a session.",
self.manifest_labelnames,
unit="m",
buckets=[100, 500, 1000, 2000, 3000, 4000]
buckets=[100, 500, 1000, 2000, 3000, 4000],
)
self._sessions_memory_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_memory_request_bytes"].value,
"Memory requested by a user for a session.",
self.manifest_labelnames,
unit="byte",
buckets=[500e6, 1e9, 2e9, 4e9, 8e9, 16e9, 32e9]
buckets=[500e6, 1e9, 2e9, 4e9, 8e9, 16e9, 32e9],
)
self._sessions_gpu_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_gpu_request"].value,
"GPUs requested by a user for a session.",
self.manifest_labelnames,
buckets=[0, 1, 2, 3, 4]
buckets=[0, 1, 2, 3, 4],
)
self._sessions_disk_request = PrometheusMetric(
PrometheusMetricTypesEnum["histogram"].value,
PrometheusMetricNames["sessions_disk_request_bytes"].value,
"Disk space requested by a user for a session.",
self.manifest_labelnames,
unit="byte",
buckets=[1e9, 4e9, 16e9, 32e9, 64e9, 128e9]
buckets=[1e9, 4e9, 16e9, 32e9, 64e9, 128e9],
)

def _collect_labels_from_manifest(self, manifest: Dict[str, Any]) -> Dict[str, str]:
Expand Down Expand Up @@ -244,10 +259,16 @@ def _on_any_status_change(self, metric_event: MetricEvent):
if metric_event.status != metric_event.old_status:
status_change_labels = {
**manifest_labels,
"status_from": metric_event.old_status.value if metric_event.old_status else "None",
"status_to": metric_event.status.value if metric_event.status else "None",
"status_from": metric_event.old_status.value
if metric_event.old_status
else "None",
"status_to": metric_event.status.value
if metric_event.status
else "None",
}
self._sessions_status_changes(PrometheusMetricAction.inc, 1, status_change_labels)
self._sessions_status_changes(
PrometheusMetricAction.inc, 1, status_change_labels
)

def publish(self, metric_event: MetricEvent):
"""Publishes (i.e. persists) the proper prometheus metrics
Expand Down
1 change: 1 addition & 0 deletions controller/metrics/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class MetricsQueue:
metrics handlers subscribe to this queue and persist or further
publish the metrics to the proper place.
"""

def __init__(self, metric_handlers=List[MetricEventHandler]):
self.q = Queue()
self.metric_handlers = metric_handlers
Expand Down
25 changes: 15 additions & 10 deletions controller/metrics/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
@dataclass
class SesionMetricData:
"""The data that is included for each metric event uploaded to S3."""

name: str
namespace: str
uid: str
Expand All @@ -46,7 +47,9 @@ def _default_json_serializer(obj):
return obj.value

def __str__(self):
return json.dumps(asdict(self), default=self._default_json_serializer, indent=None)
return json.dumps(
asdict(self), default=self._default_json_serializer, indent=None
)

@classmethod
def from_metric_event(
Expand All @@ -65,7 +68,8 @@ def from_metric_event(
metric_event.status,
metric_event.old_status,
additional_labels_from_manifest(
metric_event.session, additional_label_names,
metric_event.session,
additional_label_names,
),
)

Expand All @@ -76,11 +80,10 @@ class S3RotatingLogHandler(BaseRotatingHandler):
not kept locally. The maximum rotation period (in seconds) can be
specified.
"""

_datetime_format = "_%Y%m%d_%H%M%S%z"

def __init__(
self, filename, mode, config: S3Config, encoding=None
):
def __init__(self, filename, mode, config: S3Config, encoding=None):
super().__init__(filename, mode, encoding, delay=False)
self._period_timedelta = timedelta(seconds=config.rotation_period_seconds)
self._start_timestamp = pytz.UTC.localize(datetime.utcnow())
Expand Down Expand Up @@ -110,9 +113,7 @@ def _upload(self, fname: str, remove_after_upload: bool = False):
resp = None
if file_stats.st_size > 0:
resp = self._client.upload_file(
fname,
self._bucket,
self._s3_path_prefix + "/" + Path(fname).name
fname, self._bucket, self._s3_path_prefix + "/" + Path(fname).name
)
if remove_after_upload:
os.remove(fname)
Expand All @@ -121,7 +122,9 @@ def _upload(self, fname: str, remove_after_upload: bool = False):
def _namer(self, default_name: str) -> str:
path = Path(default_name)
new_file = path.parent / (
path.stem + self._start_timestamp.strftime(self._datetime_format) + path.suffix
path.stem
+ self._start_timestamp.strftime(self._datetime_format)
+ path.suffix
)
return os.fspath(new_file)

Expand All @@ -147,10 +150,11 @@ def shouldRollover(self, _: str) -> bool:

class S3Formatter(Formatter):
"""Logging formatter that has ISO8601 timestamps and produces valid json logs."""

def __init__(self, validate: bool = True) -> None:
datefmt = "%Y-%m-%dT%H:%M:%S%z"
style = "%"
fmt = "{\"time\":\"%(asctime)s\", \"message\":%(message)s}"
fmt = '{"time":"%(asctime)s", "message":%(message)s}'
super().__init__(fmt, datefmt, style, validate)

def formatTime(self, record: LogRecord, datefmt: Optional[str] = None) -> str:
Expand All @@ -167,6 +171,7 @@ class S3MetricHandler(MetricEventHandler):
"""A simple metric handler that persists the metrics
that are published by Amalthea to a S3 bucket.
"""

def __init__(
self,
logger: Logger,
Expand Down
Loading