Skip to content

Commit

Permalink
Collects ClusterId for Telemetry Object Collector (#1565)
Browse files Browse the repository at this point in the history
* feat: add clusterID to telemetry objects
  • Loading branch information
salonichf5 authored Feb 15, 2024
1 parent 9e719b0 commit 501f8f9
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 52 deletions.
19 changes: 19 additions & 0 deletions internal/mode/static/telemetry/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -49,6 +50,7 @@ type ProjectMetadata struct {
// Note: this type might change once https://github.com/nginxinc/nginx-gateway-fabric/issues/1318 is implemented.
type Data struct {
ProjectMetadata ProjectMetadata
ClusterID string
NodeCount int
NGFResourceCounts NGFResourceCounts
NGFReplicaCount int
Expand Down Expand Up @@ -99,6 +101,11 @@ func (c DataCollectorImpl) Collect(ctx context.Context) (Data, error) {
return Data{}, fmt.Errorf("failed to collect NGF replica count: %w", err)
}

var clusterID string
if clusterID, err = collectClusterID(ctx, c.cfg.K8sClientReader); err != nil {
return Data{}, fmt.Errorf("failed to collect clusterID: %w", err)
}

data := Data{
NodeCount: nodeCount,
NGFResourceCounts: graphResourceCount,
Expand All @@ -107,6 +114,7 @@ func (c DataCollectorImpl) Collect(ctx context.Context) (Data, error) {
Version: c.cfg.Version,
},
NGFReplicaCount: ngfReplicaCount,
ClusterID: clusterID,
}

return data, nil
Expand Down Expand Up @@ -193,3 +201,14 @@ func collectNGFReplicaCount(ctx context.Context, k8sClient client.Reader, podNSN

return int(*replicaSet.Spec.Replicas), nil
}

func collectClusterID(ctx context.Context, k8sClient client.Reader) (string, error) {
key := types.NamespacedName{
Name: meta.NamespaceSystem,
}
var kubeNamespace v1.Namespace
if err := k8sClient.Get(ctx, key, &kubeNamespace); err != nil {
return "", fmt.Errorf("failed to get kube-system namespace: %w", err)
}
return string(kubeNamespace.GetUID()), nil
}
132 changes: 80 additions & 52 deletions internal/mode/static/telemetry/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -41,12 +42,14 @@ func createListCallsFunc(nodes []v1.Node) func(
}
}

func createGetCallsFunc(objects ...client.Object) func(
type getCallsFunc = func(
context.Context,
types.NamespacedName,
client.Object,
...client.GetOption,
) error {
) error

func createGetCallsFunc(objects ...client.Object) getCallsFunc {
return func(_ context.Context, _ types.NamespacedName, object client.Object, option ...client.GetOption) error {
Expect(option).To(BeEmpty())

Expand All @@ -57,7 +60,6 @@ func createGetCallsFunc(objects ...client.Object) func(
}
}

Fail(fmt.Sprintf("unknown type: %T", object))
return nil
}
}
Expand All @@ -74,6 +76,9 @@ var _ = Describe("Collector", Ordered, func() {
podNSName types.NamespacedName
ngfPod *v1.Pod
ngfReplicaSet *appsv1.ReplicaSet
kubeNamespace *v1.Namespace

baseGetCalls getCallsFunc
)

BeforeAll(func() {
Expand Down Expand Up @@ -103,6 +108,13 @@ var _ = Describe("Collector", Ordered, func() {
Namespace: "nginx-gateway",
Name: "ngf-pod",
}

kubeNamespace = &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: meta.NamespaceSystem,
UID: "test-uid",
},
}
})

BeforeEach(func() {
Expand All @@ -111,6 +123,7 @@ var _ = Describe("Collector", Ordered, func() {
NodeCount: 0,
NGFResourceCounts: telemetry.NGFResourceCounts{},
NGFReplicaCount: 1,
ClusterID: string(kubeNamespace.GetUID()),
}

k8sClientReader = &eventsfakes.FakeReader{}
Expand All @@ -127,21 +140,43 @@ var _ = Describe("Collector", Ordered, func() {
Version: version,
PodNSName: podNSName,
})
k8sClientReader.GetCalls(createGetCallsFunc(ngfPod, ngfReplicaSet))

baseGetCalls = createGetCallsFunc(ngfPod, ngfReplicaSet, kubeNamespace)
k8sClientReader.GetCalls(baseGetCalls)
})

mergeGetCallsWithBase := func(f getCallsFunc) getCallsFunc {
return func(
ctx context.Context,
nsName types.NamespacedName,
object client.Object,
option ...client.GetOption,
) error {
err := baseGetCalls(ctx, nsName, object, option...)
Expect(err).ToNot(HaveOccurred())

return f(ctx, nsName, object, option...)
}
}

Describe("Normal case", func() {
When("collecting telemetry data", func() {
It("collects all fields", func() {
nodes := []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "node2"},
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "node3"},
ObjectMeta: metav1.ObjectMeta{
Name: "node3",
},
},
}

Expand Down Expand Up @@ -239,6 +274,27 @@ var _ = Describe("Collector", Ordered, func() {
})
})

Describe("clusterID collector", func() {
When("collecting clusterID", func() {
When("it encounters an error while collecting data", func() {
It("should error if the kubernetes client errored when getting the namespace", func() {
expectedError := errors.New("there was an error getting clusterID")
k8sClientReader.GetCalls(mergeGetCallsWithBase(
func(_ context.Context, _ types.NamespacedName, object client.Object, _ ...client.GetOption) error {
switch object.(type) {
case *v1.Namespace:
return expectedError
}
return nil
}))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedError))
})
})
})
})

Describe("node count collector", func() {
When("collecting node count data", func() {
It("collects correct data for no nodes", func() {
Expand Down Expand Up @@ -425,41 +481,38 @@ var _ = Describe("Collector", Ordered, func() {
When("it encounters an error while collecting data", func() {
It("should error if the kubernetes client errored when getting the Pod", func() {
expectedErr := errors.New("there was an error getting the Pod")
k8sClientReader.GetCalls(
func(_ context.Context, _ client.ObjectKey, object client.Object, option ...client.GetOption) error {
Expect(option).To(BeEmpty())

switch typedObj := object.(type) {
k8sClientReader.GetCalls(mergeGetCallsWithBase(
func(_ context.Context, _ client.ObjectKey, object client.Object, _ ...client.GetOption) error {
switch object.(type) {
case *v1.Pod:
return expectedErr
default:
Fail(fmt.Sprintf("unknown type: %T", typedObj))
}
return nil
})
},
))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedErr))
})

It("should error if the Pod's owner reference is nil", func() {
expectedErr := errors.New("expected one owner reference of the NGF Pod, got 0")
k8sClientReader.GetCalls(createGetCallsFunc(
k8sClientReader.GetCalls(mergeGetCallsWithBase(createGetCallsFunc(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
OwnerReferences: nil,
},
},
))
)))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedErr))
})

It("should error if the Pod has multiple owner references", func() {
expectedErr := errors.New("expected one owner reference of the NGF Pod, got 2")
k8sClientReader.GetCalls(createGetCallsFunc(
k8sClientReader.GetCalls(mergeGetCallsWithBase(createGetCallsFunc(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Expand All @@ -475,15 +528,15 @@ var _ = Describe("Collector", Ordered, func() {
},
},
},
))
)))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedErr))
})

It("should error if the Pod's owner reference is not a ReplicaSet", func() {
expectedErr := errors.New("expected pod owner reference to be ReplicaSet, got Deployment")
k8sClientReader.GetCalls(createGetCallsFunc(
k8sClientReader.GetCalls(mergeGetCallsWithBase(createGetCallsFunc(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Expand All @@ -495,61 +548,36 @@ var _ = Describe("Collector", Ordered, func() {
},
},
},
))
)))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedErr))
})

It("should error if the replica set's replicas is nil", func() {
expectedErr := errors.New("replica set replicas was nil")
k8sClientReader.GetCalls(createGetCallsFunc(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "ReplicaSet",
Name: "replicaset1",
},
},
},
},
k8sClientReader.GetCalls(mergeGetCallsWithBase(createGetCallsFunc(
&appsv1.ReplicaSet{
Spec: appsv1.ReplicaSetSpec{
Replicas: nil,
},
},
))
)))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedErr))
})

It("should error if the kubernetes client errored when getting the ReplicaSet", func() {
expectedErr := errors.New("there was an error getting the ReplicaSet")
k8sClientReader.GetCalls(
func(_ context.Context, _ client.ObjectKey, object client.Object, option ...client.GetOption) error {
Expect(option).To(BeEmpty())

switch typedObj := object.(type) {
case *v1.Pod:
typedObj.ObjectMeta = metav1.ObjectMeta{
Name: "pod1",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "ReplicaSet",
Name: "replicaset1",
},
},
}
k8sClientReader.GetCalls(mergeGetCallsWithBase(
func(_ context.Context, _ client.ObjectKey, object client.Object, _ ...client.GetOption) error {
switch object.(type) {
case *appsv1.ReplicaSet:
return expectedErr
default:
Fail(fmt.Sprintf("unknown type: %T", typedObj))
}
return nil
})
}))

_, err := dataCollector.Collect(ctx)
Expect(err).To(MatchError(expectedErr))
Expand Down

0 comments on commit 501f8f9

Please sign in to comment.