Skip to content

Commit

Permalink
Merge pull request #859 from serathius/api-allocations
Browse files Browse the repository at this point in the history
Reduce allocations in api code
  • Loading branch information
k8s-ci-robot authored Oct 18, 2021
2 parents 31be02c + ae7f591 commit 872ebcc
Show file tree
Hide file tree
Showing 18 changed files with 605 additions and 590 deletions.
55 changes: 55 additions & 0 deletions pkg/api/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2021 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
)

func filterNodes(nodes []*v1.Node, selector fields.Selector) []*v1.Node {
newNodes := make([]*v1.Node, 0, len(nodes))
fields := make(fields.Set, 2)
for _, node := range nodes {
for k := range fields {
delete(fields, k)
}
fieldsSet := generic.AddObjectMetaFieldsSet(fields, &node.ObjectMeta, false)
if !selector.Matches(fieldsSet) {
continue
}
newNodes = append(newNodes, node)
}
return newNodes
}

func filterPartialObjectMetadata(objs []runtime.Object, selector fields.Selector) []runtime.Object {
newObjs := make([]runtime.Object, 0, len(objs))
fields := make(fields.Set, 2)
for _, obj := range objs {
for k := range fields {
delete(fields, k)
}
fieldsSet := generic.AddObjectMetaFieldsSet(fields, &obj.(*metav1.PartialObjectMetadata).ObjectMeta, true)
if !selector.Matches(fieldsSet) {
continue
}
newObjs = append(newObjs, obj)
}
return newObjs
}
12 changes: 6 additions & 6 deletions pkg/api/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
corev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/metrics/pkg/apis/metrics"
"k8s.io/metrics/pkg/apis/metrics/install"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
Expand All @@ -40,11 +41,8 @@ func init() {
}

// Build constructs APIGroupInfo the metrics.k8s.io API group using the given getters.
func Build(m MetricsGetter, podLister corev1.PodLister, nodeLister corev1.NodeLister) genericapiserver.APIGroupInfo {
func Build(pod, node rest.Storage) genericapiserver.APIGroupInfo {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(metrics.GroupName, Scheme, metav1.ParameterCodec, Codecs)

node := newNodeMetrics(metrics.Resource("nodemetrics"), m, nodeLister)
pod := newPodMetrics(metrics.Resource("podmetrics"), m, podLister)
metricsServerResources := map[string]rest.Storage{
"nodes": node,
"pods": pod,
Expand All @@ -55,7 +53,9 @@ func Build(m MetricsGetter, podLister corev1.PodLister, nodeLister corev1.NodeLi
}

// Install builds the metrics for the metrics.k8s.io API, and then installs it into the given API metrics-server.
func Install(metrics MetricsGetter, podLister corev1.PodLister, nodeLister corev1.NodeLister, server *genericapiserver.GenericAPIServer) error {
info := Build(metrics, podLister, nodeLister)
func Install(m MetricsGetter, podMetadataLister cache.GenericLister, nodeLister corev1.NodeLister, server *genericapiserver.GenericAPIServer) error {
node := newNodeMetrics(metrics.Resource("nodemetrics"), m, nodeLister)
pod := newPodMetrics(metrics.Resource("podmetrics"), m, podMetadataLister)
info := Build(pod, node)
return server.InstallAPIGroup(&info)
}
10 changes: 4 additions & 6 deletions pkg/api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
apitypes "k8s.io/apimachinery/pkg/types"
metrics "k8s.io/metrics/pkg/apis/metrics"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/metrics"
)

// MetricsGetter is both a PodMetricsGetter and a NodeMetricsGetter
Expand Down Expand Up @@ -50,14 +50,12 @@ type TimeInfo struct {
type PodMetricsGetter interface {
// GetPodMetrics gets the latest metrics for all containers in each listed pod,
// returning both the metrics and the associated collection timestamp.
// If a pod is missing, the container metrics should be nil for that pod.
GetPodMetrics(pods ...apitypes.NamespacedName) ([]TimeInfo, [][]metrics.ContainerMetrics, error)
GetPodMetrics(pods ...*metav1.PartialObjectMetadata) ([]metrics.PodMetrics, error)
}

// NodeMetricsGetter knows how to fetch metrics for a node.
type NodeMetricsGetter interface {
// GetNodeMetrics gets the latest metrics for the given nodes,
// returning both the metrics and the associated collection timestamp.
// If a node is missing, the resourcelist should be nil for that node.
GetNodeMetrics(nodes ...string) ([]TimeInfo, []corev1.ResourceList, error)
GetNodeMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error)
}
129 changes: 30 additions & 99 deletions pkg/api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ import (
"fmt"
"sort"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -74,44 +72,33 @@ func (m *nodeMetrics) NewList() runtime.Object {

// List implements rest.Lister interface
func (m *nodeMetrics) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
nodes, err := m.nodes(ctx, options)
if err != nil {
return &metrics.NodeMetricsList{}, err
}

ms, err := m.getMetrics(nodes...)
if err != nil {
klog.ErrorS(err, "Failed reading nodes metrics")
return &metrics.NodeMetricsList{}, fmt.Errorf("failed reading nodes metrics: %w", err)
}
return &metrics.NodeMetricsList{Items: ms}, nil
}

func (m *nodeMetrics) nodes(ctx context.Context, options *metainternalversion.ListOptions) ([]*corev1.Node, error) {
labelSelector := labels.Everything()
if options != nil && options.LabelSelector != nil {
labelSelector = options.LabelSelector
}
nodes, err := m.nodeLister.List(labelSelector)
if err != nil {
klog.ErrorS(err, "Failed listing nodes", "labelSelector", labelSelector)
return &metrics.NodeMetricsList{}, fmt.Errorf("failed listing nodes: %w", err)
}

// maintain the same ordering invariant as the Kube API would over nodes
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})

metricsItems, err := m.getNodeMetrics(nodes...)
if err != nil {
klog.ErrorS(err, "Failed reading nodes metrics", "labelSelector", labelSelector)
return &metrics.NodeMetricsList{}, fmt.Errorf("failed reading nodes metrics: %w", err)
return nil, fmt.Errorf("failed listing nodes: %w", err)
}

if options != nil && options.FieldSelector != nil {
newMetrics := make([]metrics.NodeMetrics, 0, len(metricsItems))
fields := make(fields.Set, 2)
for _, metric := range metricsItems {
for k := range fields {
delete(fields, k)
}
fieldsSet := generic.AddObjectMetaFieldsSet(fields, &metric.ObjectMeta, false)
if !options.FieldSelector.Matches(fieldsSet) {
continue
}
newMetrics = append(newMetrics, metric)
}
metricsItems = newMetrics
nodes = filterNodes(nodes, options.FieldSelector)
}

return &metrics.NodeMetricsList{Items: metricsItems}, nil
return nodes, nil
}

// Get implements rest.Getter interface
Expand All @@ -128,15 +115,15 @@ func (m *nodeMetrics) Get(ctx context.Context, name string, opts *metav1.GetOpti
if node == nil {
return nil, errors.NewNotFound(m.groupResource, name)
}
nodeMetrics, err := m.getNodeMetrics(node)
ms, err := m.getMetrics(node)
if err != nil {
klog.ErrorS(err, "Failed reading node metrics", "node", klog.KRef("", name))
return nil, fmt.Errorf("failed reading node metrics: %w", err)
}
if len(nodeMetrics) == 0 {
if len(ms) == 0 {
return nil, errors.NewNotFound(m.groupResource, name)
}
return &nodeMetrics[0], nil
return &ms[0], nil
}

// ConvertToTable implements rest.TableConvertor interface
Expand All @@ -159,75 +146,19 @@ func (m *nodeMetrics) ConvertToTable(ctx context.Context, object runtime.Object,
return &table, nil
}

func addNodeMetricsToTable(table *metav1beta1.Table, nodes ...metrics.NodeMetrics) {
var names []string
for i, node := range nodes {
if names == nil {
for k := range node.Usage {
names = append(names, string(k))
}
sort.Strings(names)

table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: "Name of the resource"},
}
for _, name := range names {
table.ColumnDefinitions = append(table.ColumnDefinitions, metav1beta1.TableColumnDefinition{
Name: name,
Type: "string",
Format: "quantity",
})
}
table.ColumnDefinitions = append(table.ColumnDefinitions, metav1beta1.TableColumnDefinition{
Name: "Window",
Type: "string",
Format: "duration",
})
}
row := make([]interface{}, 0, len(names)+1)
row = append(row, node.Name)
for _, name := range names {
v := node.Usage[v1.ResourceName(name)]
row = append(row, v.String())
}
row = append(row, node.Window.Duration.String())
table.Rows = append(table.Rows, metav1beta1.TableRow{
Cells: row,
Object: runtime.RawExtension{Object: &nodes[i]},
})
}
}

func (m *nodeMetrics) getNodeMetrics(nodes ...*v1.Node) ([]metrics.NodeMetrics, error) {
names := make([]string, len(nodes))
for i, node := range nodes {
names[i] = node.Name
}
timestamps, usages, err := m.metrics.GetNodeMetrics(names...)
func (m *nodeMetrics) getMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error) {
ms, err := m.metrics.GetNodeMetrics(nodes...)
if err != nil {
return nil, err
}

res := make([]metrics.NodeMetrics, 0, len(names))

for i, node := range nodes {
if usages[i] == nil {
continue
}
res = append(res, metrics.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: node.Name,
CreationTimestamp: metav1.NewTime(myClock.Now()),
Labels: node.Labels,
},
Timestamp: metav1.NewTime(timestamps[i].Timestamp),
Window: metav1.Duration{Duration: timestamps[i].Window},
Usage: usages[i],
})
metricFreshness.WithLabelValues().Observe(myClock.Since(timestamps[i].Timestamp).Seconds())
for _, m := range ms {
metricFreshness.WithLabelValues().Observe(myClock.Since(m.Timestamp.Time).Seconds())
}

return res, nil
// maintain the same ordering invariant as the Kube API would over nodes
sort.Slice(ms, func(i, j int) bool {
return ms[i].Name < ms[j].Name
})
return ms, nil
}

// NamespaceScoped implements rest.Scoper interface
Expand Down
Loading

0 comments on commit 872ebcc

Please sign in to comment.