Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
itsbalamurali committed Jan 18, 2024
1 parent f77db60 commit 6fa7e73
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 34 deletions.
94 changes: 65 additions & 29 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,43 @@ def create_tenant(spec, name, namespace, **_):
kopf.info(spec, reason='CreatingTenant', message=f'Creating {namespace}/{name}.')
kubernetes.config.load_incluster_config()
kube_client = kubernetes.client.ApiClient()
check_for_pageserver(kube_client, namespace, name)
check_for_pre_requisites(kube_client, namespace, name)
# TODO: Get the NeonDeployment object linked to this tenant
# Get the storage credentials from the NeonDeployment object
# Get the resource limits from the NeonDeployment object
# neon_deployment = (kubernetes.client
# .ApiextensionsV1Api(kube_client)
# (name="neon-deployment", namespace=namespace))
pageserver_resources = spec.get('pageServer').get('resources')
if pageserver_resources is None:
pageserver_resources = default_resource_limits()
remote_storage_bucket_endpoint = spec.get('storageConfig').get('endpoint')
remote_storage_bucket_name = spec.get('storageConfig').get('bucketName')
remote_storage_bucket_region = spec.get('storageConfig').get('bucketRegion')
remote_storage_prefix_in_bucket = spec.get('storageConfig').get('prefixInBucket')
compute_node_resources = spec.get('computeNode').get('resources')
if compute_node_resources is None:
compute_node_resources = default_resource_limits()
# TODO: Update the tenant crd with the tenant id
# Deploy the pageserver
resources.pageserver.deploy_pageserver(kube_client=kube_client,
namespace=namespace,
resources=pageserver_resources,
remote_storage_endpoint=remote_storage_bucket_endpoint,
remote_storage_bucket_name=remote_storage_bucket_name,
remote_storage_bucket_region=remote_storage_bucket_region,
remote_storage_prefix_in_bucket=remote_storage_prefix_in_bucket)
# Deploy the compute nodes
resources.compute_node.deploy_compute_node(kube_client=kube_client,
namespace=namespace,
resources=compute_node_resources)
pageserver_url = f"http://pageserver.{namespace}.svc.cluster.local:6400"
# Call the api to create the tenant using requests post method to pageserver_url/v1/tenant
# If the response is not 200, raise kopf.PermanentError(f"Failed to create tenant {namespace}/{name}")
# If the response is 200, kopf.adopt the tenant
request = {}
request = {

}
response = requests.post(f"{pageserver_url}/v1/tenant", json=request)
if response.status_code != 200:
raise kopf.PermanentError(f"Failed to create tenant {namespace}/{name}")
Expand All @@ -69,7 +100,7 @@ def update_tenant(spec, name, namespace, **_):
kopf.info(spec, reason='UpdatingTenant', message=f'Updating {namespace}/{name}.')
kubernetes.config.load_incluster_config()
kube_client = kubernetes.client.ApiClient()
check_for_pageserver(kube_client, namespace, name)
check_for_pre_requisites(kube_client, namespace, name)
pageserver_url = f"http://pageserver.{namespace}.svc.cluster.local:6400"
# Call the api to update the tenant
request = {}
Expand All @@ -81,7 +112,7 @@ def delete_tenant(spec, name, namespace, **_):
kopf.info(spec, reason='DeletingTenant', message=f'Deleting {namespace}/{name}.')
kubernetes.config.load_incluster_config()
kube_client = kubernetes.client.ApiClient()
check_for_pageserver(kube_client, namespace, name)
check_for_pre_requisites(kube_client, namespace, name)
pageserver_url = f"http://pageserver.{namespace}.svc.cluster.local:6400"


Expand All @@ -90,7 +121,7 @@ def create_timeline(spec, name, namespace, **_):
kopf.info(spec, reason='CreatingTimeline', message=f'Creating {namespace}/{name}.')
kubernetes.config.load_incluster_config()
kube_client = kubernetes.client.ApiClient()
check_for_pageserver(kube_client, namespace, name)
check_for_pre_requisites(kube_client, namespace, name)
pageserver_url = f"http://pageserver.{namespace}.svc.cluster.local:6400"
# Call the api to create the timeline
request = {}
Expand All @@ -108,7 +139,7 @@ def update_timeline(spec, name, namespace, **_):
kopf.info(spec, reason='UpdatingTimeline', message=f'Updating {namespace}/{name}.')
kubernetes.config.load_incluster_config()
kube_client = kubernetes.client.ApiClient()
check_for_pageserver(kube_client, namespace, name)
check_for_pre_requisites(kube_client, namespace, name)
pageserver_url = f"http://pageserver.{namespace}.svc.cluster.local:6400"
# Call the api to update the timeline
request = {}
Expand Down Expand Up @@ -181,18 +212,7 @@ def create_deployment(spec, name, namespace, **_):
resources.control_plane.deploy_control_plane(kube_client=kube_client,
namespace=namespace,
resources=control_plane_resources)
# Deploy the pageserver
resources.pageserver.deploy_pageserver(kube_client=kube_client,
namespace=namespace,
resources=pageserver_resources,
remote_storage_endpoint=remote_storage_bucket_endpoint,
remote_storage_bucket_name=remote_storage_bucket_name,
remote_storage_bucket_region=remote_storage_bucket_region,
remote_storage_prefix_in_bucket=remote_storage_prefix_in_bucket)
# Deploy the compute nodes
resources.compute_node.deploy_compute_node(kube_client=kube_client,
namespace=namespace,
resources=compute_node_resources)

except Exception as e:
raise kopf.PermanentError(f"Failed to create NeonDeployment {namespace}/{name}: {e}")

Expand Down Expand Up @@ -291,14 +311,30 @@ async def cleanup_fn(logger, **kwargs):
pass


def check_for_pageserver(kube_client, namespace, name):
pageserver_statefulset = kubernetes.client.AppsV1Api(kube_client).read_namespaced_stateful_set(name="pageserver",
namespace=namespace)
if pageserver_statefulset is None:
raise kopf.PermanentError(f"Pageserver statefulset is missing for NeonTimeline {namespace}/{name}")
if pageserver_statefulset['status']['ready_replicas'] is None:
raise kopf.PermanentError(f"Pageserver statefulset is not ready for NeonTimeline {namespace}/{name}")
pageserver_svc = kubernetes.client.CoreV1Api(kube_client).read_namespaced_service(name="pageserver",
namespace=namespace)
if pageserver_svc is None:
raise kopf.PermanentError(f"Pageserver service is missing for NeonTimeline {namespace}/{name}")
def check_for_pre_requisites(kube_client, namespace, name):
"""
Check for the pre-requisites for NeonTenant deployment
"""
safekeepker_statefulset = (kubernetes.client
.AppsV1Api(kube_client)
.read_namespaced_stateful_set(name="safekeeper", namespace=namespace))
if safekeepker_statefulset is None:
raise kopf.PermanentError(f"Safekeeper statefulset is missing for NeonTenant {namespace}/{name}")
if safekeepker_statefulset['status']['ready_replicas'] is None:
raise kopf.PermanentError(f"Safekeeper statefulset is not ready for NeonTenant {namespace}/{name}")
safekeepker_svc = kubernetes.client.CoreV1Api(kube_client).read_namespaced_service(name="pageserver",
safekeeper=namespace)
if safekeepker_svc is None:
raise kopf.PermanentError(f"Safekeeper service is missing for NeonTenant {namespace}/{name}")
# Check for storage_broker deployment
storage_broker_deployment = (kubernetes.client
.AppsV1Api(kube_client)
.read_namespaced_deployment(name="storage-broker", namespace=namespace))
if storage_broker_deployment is None:
raise kopf.PermanentError(f"Storage broker deployment is missing for NeonTenant {namespace}/{name}")
if storage_broker_deployment['status']['ready_replicas'] is None:
raise kopf.PermanentError(f"Storage broker deployment is not ready for NeonTenant {namespace}/{name}")
storage_broker_svc = kubernetes.client.CoreV1Api(kube_client).read_namespaced_service(name="storage-broker",
namespace=namespace)
if storage_broker_svc is None:
raise kopf.PermanentError(f"Storage broker service is missing for NeonTenant {namespace}/{name}")
1 change: 0 additions & 1 deletion resources/pageserver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Creates a Kubernetes deployment for the pageserver using the kubernetes python client
import kopf
import kubernetes
from kubernetes.client import V1ResourceRequirements, ApiException
Expand Down
202 changes: 202 additions & 0 deletions resources/pgbouncer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import kopf
import kubernetes
from kubernetes.client import V1ResourceRequirements, ApiException


def deploy_pgbouncer(
namespace: str,
):
"""
Deploy the pgbouncer proxy to the cluster
:return:
"""
deployment = pgbouncer_deployment()
service = pgbouncer_service()
kopf.adopt(deployment)
kopf.adopt(service)

apps_client = kubernetes.client.AppsV1Api()
core_client = kubernetes.client.CoreV1Api()
try:
apps_client.create_namespaced_deployment(namespace=namespace, body=deployment)
core_client.create_namespaced_service(namespace=namespace, body=service)
except ApiException as e:
print("Exception when calling Api: %s\n" % e)


def update_pgbouncer(
namespace: str,
):
deployment = pgbouncer_deployment()
service = pgbouncer_service()
kopf.adopt(deployment)
kopf.adopt(service)

apps_client = kubernetes.client.AppsV1Api()
core_client = kubernetes.client.CoreV1Api()
try:
apps_client.patch_namespaced_deployment(namespace=namespace, name="pgbouncer", body=deployment)
core_client.patch_namespaced_service(namespace=namespace, name="pgbouncer", body=service)
except ApiException as e:
print("Exception when calling Api: %s\n" % e)


def delete_pgbouncer(
namespace: str,
):
apps_client = kubernetes.client.AppsV1Api()
core_client = kubernetes.client.CoreV1Api()
try:
apps_client.delete_namespaced_deployment(namespace=namespace, name="pgbouncer")
core_client.delete_namespaced_service(namespace=namespace, name="pgbouncer")
except ApiException as e:
print("Exception when calling Api: %s\n" % e)


def pgbouncer_deployment(
namespace: str,
resources: V1ResourceRequirements,
image: str = "bitnami/pgbouncer:latest",
replicas: int = 1,

) -> kubernetes.client.V1Deployment:
"""
Generate a deployment for the pgbouncer proxy
:return:
"""
return kubernetes.client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=kubernetes.client.V1ObjectMeta(
name="pgbouncer",
namespace=namespace,
labels={
"app": "pgbouncer",
},
),
spec=kubernetes.client.V1DeploymentSpec(
replicas=1,
selector=kubernetes.client.V1LabelSelector(
match_labels={
"app": "pgbouncer",
},
),
template=kubernetes.client.V1PodTemplateSpec(
metadata=kubernetes.client.V1ObjectMeta(
labels={
"app": "pgbouncer",
},
),
spec=kubernetes.client.V1PodSpec(
containers=[
kubernetes.client.V1Container(
name="pgbouncer",
image=image,
ports=[
kubernetes.client.V1ContainerPort(
container_port=5432,
),
],
resources=resources,
env=[
kubernetes.client.V1EnvVar(
name="POSTGRESQL_HOST",
value=f"postgres-compute.{namespace}.svc.cluster.local:5432",
),
kubernetes.client.V1EnvVar(
name="PGBOUNCER_AUTH_TYPE",
value="trust",
),
],
),
kubernetes.client.V1Container(
name="pgbouncer-metrics",
image="prometheuscommunity/pgbouncer-exporter:latest",
ports=[
kubernetes.client.V1ContainerPort(
container_port=9127,
),
],
resources=kubernetes.client.V1ResourceRequirements(
requests={
"cpu": "100m",
"memory": "100Mi",
},
limits={
"cpu": "100m",
"memory": "100Mi",
},
),
)
],
),
),
),
)


def pgbouncer_configmap(
namespace: str,
) -> kubernetes.client.V1ConfigMap:
"""
Generate a configmap for the pgbouncer proxy
:return:
"""
return kubernetes.client.V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=kubernetes.client.V1ObjectMeta(
name="pgbouncer",
namespace=namespace,
labels={
"app": "pgbouncer",
},
),
data={
"pgbouncer.ini": """
[databases]
* = host=postgres port=5432
}
""",
"userlist.txt": """
"postgres" "postgres"
""",
},
)


def pgbouncer_service(
namespace: str,
) -> kubernetes.client.V1Service:
"""
Generate a service for the pgbouncer proxy
:return:
"""
return kubernetes.client.V1Service(
api_version="v1",
kind="Service",
metadata=kubernetes.client.V1ObjectMeta(
name="pgbouncer",
namespace=namespace,
labels={
"app": "pgbouncer",
},
),
spec=kubernetes.client.V1ServiceSpec(
selector={
"app": "pgbouncer",
},
ports=[
kubernetes.client.V1ServicePort(
port=5432,
name="pgbouncer",
target_port=5432,
),
kubernetes.client.V1ServicePort(
port=6432,
name="pgbouncer-metrics",
target_port=6432,
),
],
),
)
9 changes: 5 additions & 4 deletions resources/storage_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ def deploy_storage_broker(
replicas: int = 1,
resources: V1ResourceRequirements = None,
):
deployment = storage_broker_deployment(namespace=namespace,
image=image,
replicas=replicas,
resources=resources)
deployment = storage_broker_deployment(
namespace=namespace,
image=image,
replicas=replicas,
resources=resources)
service = storage_broker_service(namespace)
kopf.adopt(deployment)
kopf.adopt(service)
Expand Down

0 comments on commit 6fa7e73

Please sign in to comment.