Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go unhealthy if not ready before timeout #306

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions azimuth_capi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ class Configuration(
#: The number of seconds to wait between timer executions
timer_interval: conint(gt = 0) = 60

#: The number of minutes to wait befoore marking a cluster as unhealthy
cluster_timeout_minutes: conint(gt = 0) = 30
JohnGarbutt marked this conversation as resolved.
Show resolved Hide resolved

#: The field manager name to use for server-side apply
easykube_field_manager: constr(min_length = 1) = "azimuth-capi-operator"

Expand Down
56 changes: 55 additions & 1 deletion azimuth_capi/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
CLUSTER_API_CONTROLPLANE_VERSION = f"controlplane.{CLUSTER_API_VERSION}"
AZIMUTH_SCHEDULING_VERSION = "scheduling.azimuth.stackhpc.com/v1alpha1"


# Create an easykube client from the environment
from pydantic.json import pydantic_encoder
ekclient = (
Expand Down Expand Up @@ -417,6 +416,16 @@ async def on_cluster_create(logger, instance, name, namespace, patch, **kwargs):
if instance.spec.paused:
logger.info("reconciliation is paused - no action taken")
return

# Adding a date timestamp so we can timeout pending changes that fail early
# e.g. due to failing to get a lease
last_updated_annotation_name = f"{settings.api_group}/last-updated-timestamp"
if last_updated_annotation_name not in instance.metadata.annotations:
logger.info("adding last updated timestamp")
now = dt.datetime.now(dt.timezone.utc)
annotations = patch.setdefault("metadata", {}).setdefault("annotations", {})
annotations[last_updated_annotation_name] = now.isoformat()

# Make sure that the secret exists
eksecrets = await ekclient.api("v1").resource("secrets")
secret = await eksecrets.fetch(
Expand Down Expand Up @@ -450,6 +459,7 @@ async def on_cluster_create(logger, instance, name, namespace, patch, **kwargs):
for ng in helm_values.get("nodeGroups", [])
]
else:
# TODO(johngarbutt) look for when a lease has an error, and go unhealthy
raise kopf.TemporaryError("lease is not active", delay = 15)
if settings.zenith.enabled:
helm_values = mergeconcat(
Expand Down Expand Up @@ -481,6 +491,11 @@ async def on_cluster_create(logger, instance, name, namespace, patch, **kwargs):
labels = patch.setdefault("metadata", {}).setdefault("labels", {})
labels[f"{settings.api_group}/cluster-template"] = instance.spec.template_name

# Always reset the timeout counter, now changes have been made
annotations = patch.setdefault("metadata", {}).setdefault("annotations", {})
now = dt.datetime.now(dt.timezone.utc)
annotations[last_updated_annotation_name] = now.isoformat()


@model_handler(api.Cluster, kopf.on.delete)
async def on_cluster_delete(logger, instance, name, namespace, **kwargs):
Expand Down Expand Up @@ -575,6 +590,45 @@ async def on_cluster_resume(instance, name, namespace, **kwargs):
await save_cluster_status(instance)


@model_handler(
api.Cluster,
kopf.timer,
# Since we have create and update handlers, we want to idle after a change
interval = settings.timer_interval)
async def check_cluster_timeout(logger, instance, patch, **kwargs):
# If cluster reconciliation is paused, there is nothing else to do
if instance.spec.paused:
logger.info("reconciliation is paused - no action taken")
return

# Remove the timeout counter if the cluster goes ready
last_updated_annotation_name = f"{settings.api_group}/last-updated-timestamp"
if (instance.status.phase == api.ClusterPhase.READY and
last_updated_annotation_name in instance.metadata.annotations):
annotations = patch.setdefault("metadata", {}).setdefault("annotations", {})
annotations[last_updated_annotation_name] = None
logger.info("cluster gone ready, reset timeout timestamp")
return

# Check if the cluster is in a transition state
if instance.status.phase not in [api.ClusterPhase.PENDING,
api.ClusterPhase.RECONCILING,
api.ClusterPhase.UPGRADING]:
logger.info("cluster is not in a transition state - no action taken")
return

# If we have not been updated in time, mark as unhealthy
last_updated_str = instance.metadata.annotations.get(last_updated_annotation_name)
if last_updated_str:
last_updated = dt.datetime.fromisoformat(last_updated_str)
now = dt.datetime.now(dt.timezone.utc)
logger.info(f"Time since last updated: {now - last_updated} vs {settings.cluster_timeout_minutes} minutes")
if (now - last_updated).total_seconds() > (60 * settings.cluster_timeout_minutes):
instance.status.phase = api.ClusterPhase.UNHEALTHY
await save_cluster_status(instance)
logger.info("cluster has timed out, marked as unhealthy")


def on_related_object_event(
*args,
# This function maps an object to a cluster name
Expand Down
3 changes: 3 additions & 0 deletions azimuth_capi/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def _reconcile_cluster_phase(cluster):
LeasePhase.PENDING,
LeasePhase.STARTING
}:
if cluster.status.phase == ClusterPhase.UNHEALTHY:
# preserve the unhealthy state, on timeout
return
mkjpryor marked this conversation as resolved.
Show resolved Hide resolved
cluster.status.phase = ClusterPhase.PENDING
return
if cluster.status.lease_phase == LeasePhase.ERROR:
Expand Down
Loading