Skip to content

Commit

Permalink
add external ip/port for load balancer for RPC queries (#64)
Browse files Browse the repository at this point in the history
* add external ip/port for load balancer for RPC queries

* pick unique port for rpc load balancer

* address comments
  • Loading branch information
gregcusack authored Aug 1, 2024
1 parent dcef655 commit 6ff57aa
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 26 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,27 @@ For steps (2) and (3), when using `--no-bootstrap`, we assume that the directory

Note: We can't deploy heterogeneous clusters across v1.17 and v1.18 due to feature differences. Hope to fix this in the future. Have something where we can specifically define which features to enable.

## Querying the RPC from outside the cluster
The cluster now has an external IP/port that can be queried to reach the cluster RPC. The external RPC port will be logged during cluster boot, e.g.:
```
Deploying Load Balancer Service with external port: 30000
```
1) Get any one of the node IPs in the cluster. Querying the RPC will work with any node IP in the cluster, this includes nodes that are NOT running any of your pods:
```
kubectl get nodes -o wide
```
2) Run your query. e.g.
```
curl -X POST \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "getClusterNodes"
}' \
http://<node-ip>:<external-port>
```

## Kubernetes Cheatsheet
Create namespace:
```
Expand Down
21 changes: 9 additions & 12 deletions src/k8s_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::docker::DockerImage,
crate::{docker::DockerImage, kubernetes::ServiceType},
k8s_openapi::{
api::{
apps::v1::{ReplicaSet, ReplicaSetSpec},
Expand Down Expand Up @@ -125,8 +125,12 @@ pub fn create_service(
service_name: String,
namespace: String,
label_selector: BTreeMap<String, String>,
is_load_balancer: bool,
service_type: ServiceType,
) -> Service {
let (type_, cluster_ip, node_port) = match service_type {
ServiceType::Standard => (None, Some("None".to_string()), None),
ServiceType::LoadBalancer(port) => (Some("LoadBalancer".to_string()), None, Some(port)),
};
Service {
metadata: ObjectMeta {
name: Some(service_name),
Expand All @@ -135,20 +139,13 @@ pub fn create_service(
},
spec: Some(ServiceSpec {
selector: Some(label_selector),
type_: if is_load_balancer {
Some("LoadBalancer".to_string())
} else {
None
},
cluster_ip: if is_load_balancer {
None
} else {
Some("None".to_string())
},
type_,
cluster_ip,
ports: Some(vec![
ServicePort {
port: 8899, // RPC Port
name: Some("rpc-port".to_string()),
node_port,
..Default::default()
},
ServicePort {
Expand Down
86 changes: 76 additions & 10 deletions src/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ use {
apimachinery::pkg::api::resource::Quantity,
},
kube::{
api::{Api, ListParams, PostParams},
api::{Api, ListParams, ObjectList, PostParams},
Client,
},
log::*,
solana_sdk::pubkey::Pubkey,
std::{collections::BTreeMap, error::Error, path::Path},
std::{
collections::{BTreeMap, HashSet},
error::Error,
path::Path,
},
};

#[derive(Debug, Clone)]
Expand All @@ -41,6 +45,12 @@ impl PodRequests {
}
}

#[derive(Debug, PartialEq)]
pub enum ServiceType {
Standard,
LoadBalancer(/* External Port */ i32),
}

pub struct Kubernetes<'a> {
k8s_client: Client,
namespace: String,
Expand Down Expand Up @@ -79,10 +89,14 @@ impl<'a> Kubernetes<'a> {
self.validator_config.shred_version = Some(shred_version);
}

pub async fn namespace_exists(&self) -> Result<bool, kube::Error> {
async fn get_namespaces(&self) -> Result<ObjectList<Namespace>, kube::Error> {
let namespaces: Api<Namespace> = Api::all(self.k8s_client.clone());
let namespace_list = namespaces.list(&ListParams::default()).await?;
Ok(namespace_list)
}

pub async fn namespace_exists(&self) -> Result<bool, kube::Error> {
let namespace_list = self.get_namespaces().await?;
let exists = namespace_list
.items
.iter()
Expand Down Expand Up @@ -383,7 +397,7 @@ impl<'a> Kubernetes<'a> {
service_name.to_string(),
self.namespace.clone(),
label_selector.clone(),
false,
ServiceType::Standard,
)
}

Expand All @@ -397,7 +411,7 @@ impl<'a> Kubernetes<'a> {
format!("{}-{}-{}", service_name, self.deployment_tag, index),
self.namespace.clone(),
label_selector.clone(),
false,
ServiceType::Standard,
)
}

Expand All @@ -411,17 +425,20 @@ impl<'a> Kubernetes<'a> {
service_api.create(&post_params, service).await
}

pub fn create_validator_load_balancer(
pub async fn create_validator_load_balancer(
&self,
service_name: &str,
label_selector: &BTreeMap<String, String>,
) -> Service {
k8s_helpers::create_service(
) -> Result<Service, Box<dyn Error>> {
let node_port = self.get_open_external_port_for_rpc_service().await?;
info!("Deploying Load Balancer Service with external port: {node_port}");

Ok(k8s_helpers::create_service(
service_name.to_string(),
self.namespace.clone(),
label_selector.clone(),
true,
)
ServiceType::LoadBalancer(node_port),
))
}

pub async fn is_replica_set_ready(&self, replica_set_name: &str) -> Result<bool, kube::Error> {
Expand Down Expand Up @@ -758,4 +775,53 @@ impl<'a> Kubernetes<'a> {
None,
)
}

async fn get_open_external_port_for_rpc_service(&self) -> Result<i32, Box<dyn Error>> {
let used_ports = self.get_all_used_ports().await?;

// This Node Port range is standard for kubernetes
const MIN_NODE_PORT: i32 = 30000;
const MAX_NODE_PORT: i32 = 32767;
// Find an available NodePort
let mut available_port = MIN_NODE_PORT;
while used_ports.contains(&available_port) {
available_port += 1;
}
if available_port > MAX_NODE_PORT {
return Err(format!(
"No available NodePort found in the range {MIN_NODE_PORT}-{MAX_NODE_PORT}"
)
.into());
}

Ok(available_port)
}

async fn get_all_used_ports(&self) -> Result<HashSet<i32>, kube::Error> {
let mut used_ports = HashSet::new();
let namespaces = self.get_namespaces().await?;
let namespaces: Vec<String> = namespaces
.items
.into_iter()
.filter_map(|ns| ns.metadata.name)
.collect();

// Iterate over namespaces to collect used NodePorts
for ns in namespaces {
let services: Api<Service> = Api::namespaced(self.k8s_client.clone(), &ns);
let service_list = services.list(&Default::default()).await?;
for svc in service_list {
if let Some(spec) = svc.spec {
if let Some(ports) = spec.ports {
for port in ports {
if let Some(node_port) = port.node_port {
used_ports.insert(node_port);
}
}
}
}
}
}
Ok(used_ports)
}
}
10 changes: 6 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,10 +755,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let load_balancer_label =
kub_controller.create_selector("load-balancer/name", "load-balancer-selector");
//create load balancer
let load_balancer = kub_controller.create_validator_load_balancer(
"bootstrap-and-rpc-node-lb-service",
&load_balancer_label,
);
let load_balancer = kub_controller
.create_validator_load_balancer(
"bootstrap-and-rpc-node-lb-service",
&load_balancer_label,
)
.await?;

//deploy load balancer
kub_controller.deploy_service(&load_balancer).await?;
Expand Down

0 comments on commit 6ff57aa

Please sign in to comment.