Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add external ip/port for load balancer for RPC queries #64

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,24 @@ For steps (2) and (3), when using `--no-bootstrap`, we assume that the directory

Note: We can't deploy heterogeneous clusters across v1.17 and v1.18 due to feature differences. Hope to fix this in the future. Have something where we can specifically define which features to enable.

## Querying the RPC from outside the cluster
The cluster now has an external IP/port that can be queried to reach the cluster RPC. The external RPC port is 30000.
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
1) Get any one of the node IPs in the cluster. Querying the RPC will work with any node IP in the cluster, this includes nodes that are NOT running any of your pods:
```
kubectl get nodes -o wide
```
2) Run your query. e.g.
```
curl -X POST \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "getClusterNodes"
}' \
http://<node-ip>:30000
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
```

## Kubernetes Cheatsheet
Create namespace:
```
Expand Down
21 changes: 9 additions & 12 deletions src/k8s_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::docker::DockerImage,
crate::{docker::DockerImage, kubernetes::ServiceType},
k8s_openapi::{
api::{
apps::v1::{ReplicaSet, ReplicaSetSpec},
Expand Down Expand Up @@ -125,8 +125,12 @@ pub fn create_service(
service_name: String,
namespace: String,
label_selector: BTreeMap<String, String>,
is_load_balancer: bool,
service_type: ServiceType,
) -> Service {
let (type_, cluster_ip, node_port) = match service_type {
ServiceType::Standard => (None, Some("None".to_string()), None),
ServiceType::LoadBalancer(port) => (Some("LoadBalancer".to_string()), None, Some(port)),
};
Service {
metadata: ObjectMeta {
name: Some(service_name),
Expand All @@ -135,20 +139,13 @@ pub fn create_service(
},
spec: Some(ServiceSpec {
selector: Some(label_selector),
type_: if is_load_balancer {
Some("LoadBalancer".to_string())
} else {
None
},
cluster_ip: if is_load_balancer {
None
} else {
Some("None".to_string())
},
type_,
cluster_ip,
ports: Some(vec![
ServicePort {
port: 8899, // RPC Port
name: Some("rpc-port".to_string()),
node_port,
..Default::default()
},
ServicePort {
Expand Down
86 changes: 76 additions & 10 deletions src/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ use {
apimachinery::pkg::api::resource::Quantity,
},
kube::{
api::{Api, ListParams, PostParams},
api::{Api, ListParams, ObjectList, PostParams},
Client,
},
log::*,
solana_sdk::pubkey::Pubkey,
std::{collections::BTreeMap, error::Error, path::Path},
std::{
collections::{BTreeMap, HashSet},
error::Error,
path::Path,
},
};

#[derive(Debug, Clone)]
Expand All @@ -41,6 +45,12 @@ impl PodRequests {
}
}

#[derive(Debug, PartialEq)]
pub enum ServiceType {
Standard,
LoadBalancer(/* External Port */ i32),
}

pub struct Kubernetes<'a> {
k8s_client: Client,
namespace: String,
Expand Down Expand Up @@ -79,10 +89,14 @@ impl<'a> Kubernetes<'a> {
self.validator_config.shred_version = Some(shred_version);
}

pub async fn namespace_exists(&self) -> Result<bool, kube::Error> {
async fn get_namespaces(&self) -> Result<ObjectList<Namespace>, kube::Error> {
let namespaces: Api<Namespace> = Api::all(self.k8s_client.clone());
let namespace_list = namespaces.list(&ListParams::default()).await?;
Ok(namespace_list)
}

pub async fn namespace_exists(&self) -> Result<bool, kube::Error> {
let namespace_list = self.get_namespaces().await?;
let exists = namespace_list
.items
.iter()
Expand Down Expand Up @@ -383,7 +397,7 @@ impl<'a> Kubernetes<'a> {
service_name.to_string(),
self.namespace.clone(),
label_selector.clone(),
false,
ServiceType::Standard,
)
}

Expand All @@ -397,7 +411,7 @@ impl<'a> Kubernetes<'a> {
format!("{}-{}-{}", service_name, self.deployment_tag, index),
self.namespace.clone(),
label_selector.clone(),
false,
ServiceType::Standard,
)
}

Expand All @@ -411,17 +425,20 @@ impl<'a> Kubernetes<'a> {
service_api.create(&post_params, service).await
}

pub fn create_validator_load_balancer(
pub async fn create_validator_load_balancer(
&self,
service_name: &str,
label_selector: &BTreeMap<String, String>,
) -> Service {
k8s_helpers::create_service(
) -> Result<Service, Box<dyn Error>> {
let node_port = self.get_open_external_port_for_rpc_service().await?;
info!("Deploying Load Balancer Service with external port: {node_port}");

Ok(k8s_helpers::create_service(
service_name.to_string(),
self.namespace.clone(),
label_selector.clone(),
true,
)
ServiceType::LoadBalancer(node_port),
))
}

pub async fn is_replica_set_ready(&self, replica_set_name: &str) -> Result<bool, kube::Error> {
Expand Down Expand Up @@ -758,4 +775,53 @@ impl<'a> Kubernetes<'a> {
None,
)
}

async fn get_open_external_port_for_rpc_service(&self) -> Result<i32, Box<dyn Error>> {
let used_ports = self.get_all_used_ports().await?;

// This Node Port range is standard for kubernetes
const MIN_NODE_PORT: i32 = 30000;
const MAX_NODE_PORT: i32 = 32767;
// Find an available NodePort
let mut available_port = MIN_NODE_PORT;
while used_ports.contains(&available_port) {
available_port += 1;
}
if available_port > MAX_NODE_PORT {
return Err(format!(
"No available NodePort found in the range {MIN_NODE_PORT}-{MAX_NODE_PORT}"
)
.into());
}

Ok(available_port)
}

async fn get_all_used_ports(&self) -> Result<HashSet<i32>, kube::Error> {
let mut used_ports = HashSet::new();
let namespaces = self.get_namespaces().await?;
let namespaces: Vec<String> = namespaces
.items
.into_iter()
.filter_map(|ns| ns.metadata.name)
.collect();

// Iterate over namespaces to collect used NodePorts
for ns in namespaces {
let services: Api<Service> = Api::namespaced(self.k8s_client.clone(), &ns);
let service_list = services.list(&Default::default()).await?;
for svc in service_list {
if let Some(spec) = svc.spec {
if let Some(ports) = spec.ports {
for port in ports {
if let Some(node_port) = port.node_port {
used_ports.insert(node_port);
}
}
}
}
}
}
Ok(used_ports)
}
}
10 changes: 6 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,10 +755,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let load_balancer_label =
kub_controller.create_selector("load-balancer/name", "load-balancer-selector");
//create load balancer
let load_balancer = kub_controller.create_validator_load_balancer(
"bootstrap-and-rpc-node-lb-service",
&load_balancer_label,
);
let load_balancer = kub_controller
.create_validator_load_balancer(
"bootstrap-and-rpc-node-lb-service",
&load_balancer_label,
)
.await?;

//deploy load balancer
kub_controller.deploy_service(&load_balancer).await?;
Expand Down