From 3776dc542a7a034a29e90518cebb5649be7ffad4 Mon Sep 17 00:00:00 2001 From: terassyi Date: Wed, 20 Dec 2023 00:52:20 +0900 Subject: [PATCH] fix not to allocate already used addresses Signed-off-by: terassyi --- .../reconciler/endpointslice_watcher.rs | 32 +++++++++++------ .../controller/reconciler/service_watcher.rs | 35 +++++++++++++------ 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs b/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs index aeaf538..8d66774 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs @@ -9,7 +9,7 @@ use futures::StreamExt; use ipnet::IpNet; use k8s_openapi::api::{core::v1::Service, discovery::v1::EndpointSlice}; use kube::{ - api::{ListParams, PostParams, DeleteParams}, + api::{DeleteParams, ListParams, PostParams}, core::ObjectMeta, runtime::{ controller::Action, @@ -108,10 +108,16 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc) -> Result::namespaced(ctx.client().clone(), ns.as_str()); - + let label = format!("{}={}", SERVICE_NAME_LABEL, svc_name); - let adv_list = bgp_advertisements.list(&ListParams::default().labels(&label)).await.map_err(Error::Kube)?; - let mut existing_adv_map: HashMap = adv_list.into_iter().map(|a| (a.spec.cidr.clone(), a)).collect(); + let adv_list = bgp_advertisements + .list(&ListParams::default().labels(&label)) + .await + .map_err(Error::Kube)?; + let mut existing_adv_map: HashMap = adv_list + .into_iter() + .map(|a| (a.spec.cidr.clone(), a)) + .collect(); // If Endpointslice has no endpoint, requeue and wait for creating endpoints // In case of creating new BGPAdvertisement, there are the case that endpointslice has no valid endpoint @@ -123,8 +129,7 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc) -> Result { let mut new_adv = adv.clone(); @@ -194,7 +199,10 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc) -> Result) -> Result String { mod tests { use crate::fixture::reconciler::{ - assert_resource_request, - test_eps, test_svc, timeout_after_1s, ApiServerVerifier + assert_resource_request, test_eps, test_svc, timeout_after_1s, ApiServerVerifier, }; use super::*; use http::Response; - use hyper::{body::to_bytes, Body}; + use hyper::Body; use k8s_openapi::api::core::v1::ServiceStatus; use rstest::rstest; diff --git a/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs b/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs index 87363da..bbc48be 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs @@ -105,7 +105,11 @@ async fn reconcile( // Get already allocated addresses let actual_addrs = get_allocated_lb_addrs(svc).unwrap_or_default(); - let allocated_addrs = actual_addrs.iter().map(|a| a.to_string()).collect::>().join(","); + let allocated_addrs = actual_addrs + .iter() + .map(|a| a.to_string()) + .collect::>() + .join(","); let allocated_addr_str = allocated_addrs.as_str(); // Sync the externalTrafficPolicy value and allocation @@ -134,18 +138,23 @@ async fn reconcile( .annotations_mut() .insert(SERVICE_ETP_ANNOTATION.to_string(), e.to_string()); need_update = true; - } // sync allocation - match new_eps.annotations_mut().get_mut(SERVICE_ALLOCATION_ANNOTATION) { + match new_eps + .annotations_mut() + .get_mut(SERVICE_ALLOCATION_ANNOTATION) + { Some(val) => { if allocated_addr_str.ne(val.as_str()) { *val = allocated_addr_str.to_string(); need_update = true; } - }, + } None => { - new_eps.annotations_mut().insert(SERVICE_ALLOCATION_ANNOTATION.to_string(), allocated_addr_str.to_string()); + new_eps.annotations_mut().insert( + SERVICE_ALLOCATION_ANNOTATION.to_string(), + allocated_addr_str.to_string(), + ); need_update = true; } } @@ -228,22 +237,28 @@ async fn reconcile( "Update service status by the allocation lb address" ); - let new_allocated_addrs = get_allocated_lb_addrs(&new_svc).map(|v| v.iter().map(|a| a.to_string()).collect::>()).map(|v| v.join(",")); + let new_allocated_addrs = get_allocated_lb_addrs(&new_svc) + .map(|v| v.iter().map(|a| a.to_string()).collect::>()) + .map(|v| v.join(",")); match new_allocated_addrs { Some(addrs) => { for eps in epss.iter() { let mut new_eps = eps.clone(); - new_eps.annotations_mut().insert(SERVICE_ALLOCATION_ANNOTATION.to_string(), addrs.clone()); + new_eps + .annotations_mut() + .insert(SERVICE_ALLOCATION_ANNOTATION.to_string(), addrs.clone()); endpointslice_api .replace(&eps.name_any(), &PostParams::default(), &new_eps) .await .map_err(Error::Kube)?; } - }, + } None => { for eps in epss.iter() { let mut new_eps = eps.clone(); - new_eps.annotations_mut().remove(SERVICE_ALLOCATION_ANNOTATION); + new_eps + .annotations_mut() + .remove(SERVICE_ALLOCATION_ANNOTATION); endpointslice_api .replace(&eps.name_any(), &PostParams::default(), &new_eps) .await @@ -439,7 +454,7 @@ fn update_allocations( MarkedAllocation::Allocate(addr_opt) => match addr_opt { Some(addr) => { // Allocate the specified address. - let addr = block.allocator.allocate(addr, true).map_err(Error::Ipam)?; + let addr = block.allocator.allocate(addr, false).map_err(Error::Ipam)?; remained.push(addr); } None => {