Skip to content

Commit

Permalink
fix: kubecost extractor (#358)
Browse files Browse the repository at this point in the history
* extract current resource requests for kubecost extractor

* wipe resource status from cache
  • Loading branch information
zreigz authored Jan 30, 2025
1 parent f30c2a3 commit 2325913
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 17 deletions.
99 changes: 82 additions & 17 deletions internal/controller/kubecostextractor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@ import (
"strings"
"time"

"github.com/pluralsh/polly/algorithms"

"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/opencost/opencost/core/pkg/opencost"
cmap "github.com/orcaman/concurrent-map/v2"
console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/internal/utils"
consoleclient "github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/polly/algorithms"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -52,6 +50,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const kubeCostJitter = time.Minute * 5
Expand Down Expand Up @@ -403,7 +402,7 @@ func (r *KubecostExtractorReconciler) getClusterID(ctx context.Context, srv *cor
return resp.Data.ClusterID, nil
}

func (r *KubecostExtractorReconciler) getObjectInfo(ctx context.Context, resourceType console.ScalingRecommendationType, namespace, name string) (container, serviceId *string, err error) {
func (r *KubecostExtractorReconciler) getObjectInfo(ctx context.Context, resourceType console.ScalingRecommendationType, namespace, name string) (container, serviceId *string, resourceRequest *ResourceRequests, err error) {
gvk := schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Expand All @@ -416,7 +415,7 @@ func (r *KubecostExtractorReconciler) getObjectInfo(ctx context.Context, resourc
case console.ScalingRecommendationTypeStatefulset:
gvk.Kind = "StatefulSet"
default:
return nil, nil, nil
return nil, nil, nil, nil
}
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
Expand All @@ -428,9 +427,74 @@ func (r *KubecostExtractorReconciler) getObjectInfo(ctx context.Context, resourc
serviceId = lo.ToPtr(svcId)
}

containersResourceRequest := ExtractResourceRequests(obj)
if len(containersResourceRequest) > 0 {
// get resource requests from the first container
resourceRequests := algorithms.MapValues(containersResourceRequest)
resourceRequest = resourceRequests[0]
}

return
}

type ResourceRequests struct {
CPU float64
Memory float64
}

// ExtractResourceRequests fetches CPU and memory requests from an Unstructured Kubernetes workload object.
func ExtractResourceRequests(obj *unstructured.Unstructured) map[string]*ResourceRequests {
// Extract the spec.template.spec.containers field
containers, found, err := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "containers")
if err != nil || !found {
return nil
}

containersResourceRequests := make(map[string]*ResourceRequests)

// Iterate over containers
for _, container := range containers {
containerMap, ok := container.(map[string]interface{})
if !ok {
continue
}

name, found, _ := unstructured.NestedString(containerMap, "name")
if !found {
continue
}

requests, found, _ := unstructured.NestedMap(containerMap, "resources", "requests")
if !found {
continue
}

cpuFloat := float64(0)
memoryFloat := float64(0)
cpu, ok := requests["cpu"].(string)
if ok {
cpuQuantity, err := resource.ParseQuantity(cpu)
if err == nil {
cpuFloat = cpuQuantity.AsApproximateFloat64()
}
}
memory, ok := requests["memory"].(string)
if ok {
memoryQuantity, err := resource.ParseQuantity(memory)
if err == nil {
memoryFloat = memoryQuantity.AsApproximateFloat64()
}
}

containersResourceRequests[name] = &ResourceRequests{
CPU: cpuFloat,
Memory: memoryFloat,
}
}

return containersResourceRequests
}

// SetupWithManager sets up the controller with the Manager.
func (r *KubecostExtractorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -452,15 +516,13 @@ type clusterinfoResponse struct {
func (r *KubecostExtractorReconciler) convertClusterRecommendationAttributes(ctx context.Context, allocation opencost.Allocation, name, resourceType string) *console.ClusterRecommendationAttributes {
resourceTypeEnum := console.ScalingRecommendationType(strings.ToUpper(resourceType))
result := &console.ClusterRecommendationAttributes{
Type: lo.ToPtr(resourceTypeEnum),
Name: lo.ToPtr(name),
MemoryRequest: lo.ToPtr(allocation.RAMBytesRequestAverage),
CPURequest: lo.ToPtr(allocation.CPUCoreRequestAverage),
CPUCost: lo.ToPtr(allocation.CPUCost),
MemoryCost: lo.ToPtr(allocation.RAMCost),
GpuCost: lo.ToPtr(allocation.GPUCost),
CPUUtil: lo.ToPtr(allocation.CPUCoreUsageAverage),
MemoryUtil: lo.ToPtr(allocation.RAMBytesUsageAverage),
Type: lo.ToPtr(resourceTypeEnum),
Name: lo.ToPtr(name),
CPUCost: lo.ToPtr(allocation.CPUCost),
MemoryCost: lo.ToPtr(allocation.RAMCost),
GpuCost: lo.ToPtr(allocation.GPUCost),
CPUUtil: lo.ToPtr(allocation.CPUCoreUsageAverage),
MemoryUtil: lo.ToPtr(allocation.RAMBytesUsageAverage),
}
if allocation.Properties != nil {
namespace, ok := allocation.Properties.NamespaceLabels["kubernetes_io_metadata_name"]
Expand All @@ -476,13 +538,16 @@ func (r *KubecostExtractorReconciler) convertClusterRecommendationAttributes(ctx
namespace = *result.Namespace
}

container, serviceID, err := r.getObjectInfo(ctx, resourceTypeEnum, namespace, name)
container, serviceID, resourceRequest, err := r.getObjectInfo(ctx, resourceTypeEnum, namespace, name)
if err != nil {
return result
}
result.Container = container
result.ServiceID = serviceID

if resourceRequest != nil {
result.CPURequest = lo.ToPtr(resourceRequest.CPU)
result.MemoryRequest = lo.ToPtr(resourceRequest.Memory)
}
return result
}

Expand Down
1 change: 1 addition & 0 deletions pkg/cache/resource_cache_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ResourceCacheEntry struct {
func (in *ResourceCacheEntry) Expire() {
in.manifestSHA = nil
in.applySHA = nil
in.status = nil
}

// SetSHA updates shaType with SHA calculated based on the provided resource.
Expand Down

0 comments on commit 2325913

Please sign in to comment.