From 6ff57aa186aafc0eca554f03b6111a7eddd78bba Mon Sep 17 00:00:00 2001 From: Greg Cusack Date: Wed, 31 Jul 2024 18:12:21 -0700 Subject: [PATCH] add external ip/port for load balancer for RPC queries (#64) * add external ip/port for load balancer for RPC queries * pick unique port for rpc load balancer * address comments --- README.md | 21 +++++++++++ src/k8s_helpers.rs | 21 +++++------ src/kubernetes.rs | 86 ++++++++++++++++++++++++++++++++++++++++------ src/main.rs | 10 +++--- 4 files changed, 112 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 91ac135..2783b04 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,27 @@ For steps (2) and (3), when using `--no-bootstrap`, we assume that the directory Note: We can't deploy heterogeneous clusters across v1.17 and v1.18 due to feature differences. Hope to fix this in the future. Have something where we can specifically define which features to enable. +## Querying the RPC from outside the cluster +The cluster now has an external IP/port that can be queried to reach the cluster RPC. The external RPC port will be logged during cluster boot, e.g.: +``` +Deploying Load Balancer Service with external port: 30000 +``` +1) Get any one of the node IPs in the cluster. Querying the RPC will work with any node IP in the cluster, this includes nodes that are NOT running any of your pods: +``` +kubectl get nodes -o wide +``` +2) Run your query. e.g. +``` +curl -X POST \ +-H "Content-Type: application/json" \ +-d '{ + "jsonrpc": "2.0", + "id": 1, + "method": "getClusterNodes" + }' \ +http://: +``` + ## Kubernetes Cheatsheet Create namespace: ``` diff --git a/src/k8s_helpers.rs b/src/k8s_helpers.rs index 007edf7..4755462 100644 --- a/src/k8s_helpers.rs +++ b/src/k8s_helpers.rs @@ -1,5 +1,5 @@ use { - crate::docker::DockerImage, + crate::{docker::DockerImage, kubernetes::ServiceType}, k8s_openapi::{ api::{ apps::v1::{ReplicaSet, ReplicaSetSpec}, @@ -125,8 +125,12 @@ pub fn create_service( service_name: String, namespace: String, label_selector: BTreeMap, - is_load_balancer: bool, + service_type: ServiceType, ) -> Service { + let (type_, cluster_ip, node_port) = match service_type { + ServiceType::Standard => (None, Some("None".to_string()), None), + ServiceType::LoadBalancer(port) => (Some("LoadBalancer".to_string()), None, Some(port)), + }; Service { metadata: ObjectMeta { name: Some(service_name), @@ -135,20 +139,13 @@ pub fn create_service( }, spec: Some(ServiceSpec { selector: Some(label_selector), - type_: if is_load_balancer { - Some("LoadBalancer".to_string()) - } else { - None - }, - cluster_ip: if is_load_balancer { - None - } else { - Some("None".to_string()) - }, + type_, + cluster_ip, ports: Some(vec![ ServicePort { port: 8899, // RPC Port name: Some("rpc-port".to_string()), + node_port, ..Default::default() }, ServicePort { diff --git a/src/kubernetes.rs b/src/kubernetes.rs index 8f8a9e6..3dea15c 100644 --- a/src/kubernetes.rs +++ b/src/kubernetes.rs @@ -17,12 +17,16 @@ use { apimachinery::pkg::api::resource::Quantity, }, kube::{ - api::{Api, ListParams, PostParams}, + api::{Api, ListParams, ObjectList, PostParams}, Client, }, log::*, solana_sdk::pubkey::Pubkey, - std::{collections::BTreeMap, error::Error, path::Path}, + std::{ + collections::{BTreeMap, HashSet}, + error::Error, + path::Path, + }, }; #[derive(Debug, Clone)] @@ -41,6 +45,12 @@ impl PodRequests { } } +#[derive(Debug, PartialEq)] +pub enum ServiceType { + Standard, + LoadBalancer(/* External Port */ i32), +} + pub struct Kubernetes<'a> { k8s_client: Client, namespace: String, @@ -79,10 +89,14 @@ impl<'a> Kubernetes<'a> { self.validator_config.shred_version = Some(shred_version); } - pub async fn namespace_exists(&self) -> Result { + async fn get_namespaces(&self) -> Result, kube::Error> { let namespaces: Api = Api::all(self.k8s_client.clone()); let namespace_list = namespaces.list(&ListParams::default()).await?; + Ok(namespace_list) + } + pub async fn namespace_exists(&self) -> Result { + let namespace_list = self.get_namespaces().await?; let exists = namespace_list .items .iter() @@ -383,7 +397,7 @@ impl<'a> Kubernetes<'a> { service_name.to_string(), self.namespace.clone(), label_selector.clone(), - false, + ServiceType::Standard, ) } @@ -397,7 +411,7 @@ impl<'a> Kubernetes<'a> { format!("{}-{}-{}", service_name, self.deployment_tag, index), self.namespace.clone(), label_selector.clone(), - false, + ServiceType::Standard, ) } @@ -411,17 +425,20 @@ impl<'a> Kubernetes<'a> { service_api.create(&post_params, service).await } - pub fn create_validator_load_balancer( + pub async fn create_validator_load_balancer( &self, service_name: &str, label_selector: &BTreeMap, - ) -> Service { - k8s_helpers::create_service( + ) -> Result> { + let node_port = self.get_open_external_port_for_rpc_service().await?; + info!("Deploying Load Balancer Service with external port: {node_port}"); + + Ok(k8s_helpers::create_service( service_name.to_string(), self.namespace.clone(), label_selector.clone(), - true, - ) + ServiceType::LoadBalancer(node_port), + )) } pub async fn is_replica_set_ready(&self, replica_set_name: &str) -> Result { @@ -758,4 +775,53 @@ impl<'a> Kubernetes<'a> { None, ) } + + async fn get_open_external_port_for_rpc_service(&self) -> Result> { + let used_ports = self.get_all_used_ports().await?; + + // This Node Port range is standard for kubernetes + const MIN_NODE_PORT: i32 = 30000; + const MAX_NODE_PORT: i32 = 32767; + // Find an available NodePort + let mut available_port = MIN_NODE_PORT; + while used_ports.contains(&available_port) { + available_port += 1; + } + if available_port > MAX_NODE_PORT { + return Err(format!( + "No available NodePort found in the range {MIN_NODE_PORT}-{MAX_NODE_PORT}" + ) + .into()); + } + + Ok(available_port) + } + + async fn get_all_used_ports(&self) -> Result, kube::Error> { + let mut used_ports = HashSet::new(); + let namespaces = self.get_namespaces().await?; + let namespaces: Vec = namespaces + .items + .into_iter() + .filter_map(|ns| ns.metadata.name) + .collect(); + + // Iterate over namespaces to collect used NodePorts + for ns in namespaces { + let services: Api = Api::namespaced(self.k8s_client.clone(), &ns); + let service_list = services.list(&Default::default()).await?; + for svc in service_list { + if let Some(spec) = svc.spec { + if let Some(ports) = spec.ports { + for port in ports { + if let Some(node_port) = port.node_port { + used_ports.insert(node_port); + } + } + } + } + } + } + Ok(used_ports) + } } diff --git a/src/main.rs b/src/main.rs index 72e42ca..e86b515 100644 --- a/src/main.rs +++ b/src/main.rs @@ -755,10 +755,12 @@ async fn main() -> Result<(), Box> { let load_balancer_label = kub_controller.create_selector("load-balancer/name", "load-balancer-selector"); //create load balancer - let load_balancer = kub_controller.create_validator_load_balancer( - "bootstrap-and-rpc-node-lb-service", - &load_balancer_label, - ); + let load_balancer = kub_controller + .create_validator_load_balancer( + "bootstrap-and-rpc-node-lb-service", + &load_balancer_label, + ) + .await?; //deploy load balancer kub_controller.deploy_service(&load_balancer).await?;