Skip to content

Commit

Permalink
Merge pull request #52 from terassyi/fix-not-to-allocate-conflicted-a…
Browse files Browse the repository at this point in the history
…ddress

fix not to allocate already used addresses
  • Loading branch information
terassyi authored Dec 19, 2023
2 parents e9b5386 + 3776dc5 commit 85d7c4c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::StreamExt;
use ipnet::IpNet;
use k8s_openapi::api::{core::v1::Service, discovery::v1::EndpointSlice};
use kube::{
api::{ListParams, PostParams, DeleteParams},
api::{DeleteParams, ListParams, PostParams},
core::ObjectMeta,
runtime::{
controller::Action,
Expand Down Expand Up @@ -108,10 +108,16 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc<Context>) -> Result<Action, Err
let target_peers = get_target_peers(eps, &svc, &node_bgps).await?;

let bgp_advertisements = Api::<BGPAdvertisement>::namespaced(ctx.client().clone(), ns.as_str());

let label = format!("{}={}", SERVICE_NAME_LABEL, svc_name);
let adv_list = bgp_advertisements.list(&ListParams::default().labels(&label)).await.map_err(Error::Kube)?;
let mut existing_adv_map: HashMap<String, BGPAdvertisement> = adv_list.into_iter().map(|a| (a.spec.cidr.clone(), a)).collect();
let adv_list = bgp_advertisements
.list(&ListParams::default().labels(&label))
.await
.map_err(Error::Kube)?;
let mut existing_adv_map: HashMap<String, BGPAdvertisement> = adv_list
.into_iter()
.map(|a| (a.spec.cidr.clone(), a))
.collect();

// If Endpointslice has no endpoint, requeue and wait for creating endpoints
// In case of creating new BGPAdvertisement, there are the case that endpointslice has no valid endpoint
Expand All @@ -123,8 +129,7 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc<Context>) -> Result<Action, Err

// get from existing advertisement list
// and remove from its map
match existing_adv_map.remove(&cidr_str)
{
match existing_adv_map.remove(&cidr_str) {
Some(adv) => {
let mut new_adv = adv.clone();

Expand Down Expand Up @@ -194,7 +199,10 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc<Context>) -> Result<Action, Err
metadata: ObjectMeta {
name: Some(adv_name),
namespace: eps.namespace(),
labels: Some(BTreeMap::from([(SERVICE_NAME_LABEL.to_string(), svc_name.to_string())])),
labels: Some(BTreeMap::from([(
SERVICE_NAME_LABEL.to_string(),
svc_name.to_string(),
)])),
owner_references: Some(vec![create_owner_reference(eps)]),
..Default::default()
},
Expand Down Expand Up @@ -225,7 +233,10 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc<Context>) -> Result<Action, Err
}
// After handling advertisements related to actually allocated addresses,
for (_, removable) in existing_adv_map.iter() {
bgp_advertisements.delete(&removable.name_any(), &DeleteParams::default()).await.map_err(Error::Kube)?;
bgp_advertisements
.delete(&removable.name_any(), &DeleteParams::default())
.await
.map_err(Error::Kube)?;
}

if need_requeue {
Expand Down Expand Up @@ -400,14 +411,13 @@ fn adv_name_from_eps_and_addr(eps: &EndpointSlice, addr: &IpAddr) -> String {
mod tests {

use crate::fixture::reconciler::{
assert_resource_request,
test_eps, test_svc, timeout_after_1s, ApiServerVerifier
assert_resource_request, test_eps, test_svc, timeout_after_1s, ApiServerVerifier,
};

use super::*;

use http::Response;
use hyper::{body::to_bytes, Body};
use hyper::Body;
use k8s_openapi::api::core::v1::ServiceStatus;
use rstest::rstest;

Expand Down
35 changes: 25 additions & 10 deletions sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ async fn reconcile(
// Get already allocated addresses
let actual_addrs = get_allocated_lb_addrs(svc).unwrap_or_default();

let allocated_addrs = actual_addrs.iter().map(|a| a.to_string()).collect::<Vec<String>>().join(",");
let allocated_addrs = actual_addrs
.iter()
.map(|a| a.to_string())
.collect::<Vec<String>>()
.join(",");
let allocated_addr_str = allocated_addrs.as_str();

// Sync the externalTrafficPolicy value and allocation
Expand Down Expand Up @@ -134,18 +138,23 @@ async fn reconcile(
.annotations_mut()
.insert(SERVICE_ETP_ANNOTATION.to_string(), e.to_string());
need_update = true;

}
// sync allocation
match new_eps.annotations_mut().get_mut(SERVICE_ALLOCATION_ANNOTATION) {
match new_eps
.annotations_mut()
.get_mut(SERVICE_ALLOCATION_ANNOTATION)
{
Some(val) => {
if allocated_addr_str.ne(val.as_str()) {
*val = allocated_addr_str.to_string();
need_update = true;
}
},
}
None => {
new_eps.annotations_mut().insert(SERVICE_ALLOCATION_ANNOTATION.to_string(), allocated_addr_str.to_string());
new_eps.annotations_mut().insert(
SERVICE_ALLOCATION_ANNOTATION.to_string(),
allocated_addr_str.to_string(),
);
need_update = true;
}
}
Expand Down Expand Up @@ -228,22 +237,28 @@ async fn reconcile(
"Update service status by the allocation lb address"
);

let new_allocated_addrs = get_allocated_lb_addrs(&new_svc).map(|v| v.iter().map(|a| a.to_string()).collect::<Vec<String>>()).map(|v| v.join(","));
let new_allocated_addrs = get_allocated_lb_addrs(&new_svc)
.map(|v| v.iter().map(|a| a.to_string()).collect::<Vec<String>>())
.map(|v| v.join(","));
match new_allocated_addrs {
Some(addrs) => {
for eps in epss.iter() {
let mut new_eps = eps.clone();
new_eps.annotations_mut().insert(SERVICE_ALLOCATION_ANNOTATION.to_string(), addrs.clone());
new_eps
.annotations_mut()
.insert(SERVICE_ALLOCATION_ANNOTATION.to_string(), addrs.clone());
endpointslice_api
.replace(&eps.name_any(), &PostParams::default(), &new_eps)
.await
.map_err(Error::Kube)?;
}
},
}
None => {
for eps in epss.iter() {
let mut new_eps = eps.clone();
new_eps.annotations_mut().remove(SERVICE_ALLOCATION_ANNOTATION);
new_eps
.annotations_mut()
.remove(SERVICE_ALLOCATION_ANNOTATION);
endpointslice_api
.replace(&eps.name_any(), &PostParams::default(), &new_eps)
.await
Expand Down Expand Up @@ -439,7 +454,7 @@ fn update_allocations(
MarkedAllocation::Allocate(addr_opt) => match addr_opt {
Some(addr) => {
// Allocate the specified address.
let addr = block.allocator.allocate(addr, true).map_err(Error::Ipam)?;
let addr = block.allocator.allocate(addr, false).map_err(Error::Ipam)?;
remained.push(addr);
}
None => {
Expand Down

0 comments on commit 85d7c4c

Please sign in to comment.