Skip to content

Commit

Permalink
Introducing common integration helper and change package name (#8327)
Browse files Browse the repository at this point in the history
* create new integration/source folder and factor common code into helper.go

Signed-off-by: Matthias Wessendorf <[email protected]>

* 💄 improve formatting...

Signed-off-by: Matthias Wessendorf <[email protected]>

* fixing import

Signed-off-by: Matthias Wessendorf <[email protected]>

---------

Signed-off-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
matzew authored Nov 19, 2024
1 parent 201e096 commit ebe99e6
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 296 deletions.
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"knative.dev/eventing/pkg/reconciler/channel"
"knative.dev/eventing/pkg/reconciler/containersource"
"knative.dev/eventing/pkg/reconciler/eventtype"
"knative.dev/eventing/pkg/reconciler/integrationsource"
integrationsource "knative.dev/eventing/pkg/reconciler/integration/source"
"knative.dev/eventing/pkg/reconciler/parallel"
"knative.dev/eventing/pkg/reconciler/pingsource"
"knative.dev/eventing/pkg/reconciler/sequence"
Expand Down
126 changes: 126 additions & 0 deletions pkg/reconciler/integration/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"fmt"
"reflect"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
)

func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar {
var envVars []corev1.EnvVar

// Use reflection to inspect the struct fields
v := reflect.ValueOf(s)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}

t := v.Type()

for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
fieldType := t.Field(i)

// Skip unexported fields
if !field.CanInterface() {
continue
}

// Handle embedded/anonymous structs recursively
if fieldType.Anonymous && field.Kind() == reflect.Struct {
// Recursively handle embedded structs with the same prefix
envVars = append(envVars, GenerateEnvVarsFromStruct(prefix, field.Interface())...)
continue
}

// First, check for the custom 'camel' tag
envVarName := fieldType.Tag.Get("camel")
if envVarName == "" {
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
jsonTag := fieldType.Tag.Get("json")
tagName := strings.Split(jsonTag, ",")[0]
if tagName == "" || tagName == "-" {
tagName = fieldType.Name
}
envVarName = fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName))
}

if field.Kind() == reflect.Ptr {
if field.IsNil() {
continue
}
field = field.Elem()
}

var value string
switch field.Kind() {
case reflect.Int, reflect.Int32, reflect.Int64:
value = strconv.FormatInt(field.Int(), 10)
case reflect.Bool:
value = strconv.FormatBool(field.Bool())
case reflect.String:
value = field.String()
default:
// Skip unsupported types
continue
}

// Skip zero/empty values
if value == "" {
continue
}

envVars = append(envVars, corev1.EnvVar{
Name: envVarName,
Value: value,
})
}

return envVars
}

func MakeSecretEnvVar(name, key, secretName string) corev1.EnvVar {
return corev1.EnvVar{
Name: name,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
Key: key,
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName,
},
},
},
}
}

func MakeSSLEnvVar() []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED",
Value: "true",
},
{
Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH",
Value: "/knative-custom-certs/knative-eventing-bundle.pem",
},
}
}
77 changes: 77 additions & 0 deletions pkg/reconciler/integration/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
)

func TestGenerateEnvVarsFromStruct(t *testing.T) {
type TestStruct struct {
Field1 int `json:"field1"`
Field2 bool `json:"field2"`
Field3 string `json:"field3"`
}

prefix := "TEST_PREFIX"
input := &TestStruct{
Field1: 123,
Field2: true,
Field3: "hello",
}

// Expected environment variables including SSL settings
want := []corev1.EnvVar{
{Name: "TEST_PREFIX_FIELD1", Value: "123"},
{Name: "TEST_PREFIX_FIELD2", Value: "true"},
{Name: "TEST_PREFIX_FIELD3", Value: "hello"},
}

got := GenerateEnvVarsFromStruct(prefix, input)

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff)
}
}

func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
type AWSS3 struct {
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"`
Region string `json:"region,omitempty"`
}

prefix := "CAMEL_KAMELET_AWS_S3_SOURCE"
input := AWSS3{
Arn: "arn:aws:s3:::example-bucket",
Region: "us-west-2",
}

// Expected environment variables including SSL settings and camel tag for Arn
want := []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
}

got := GenerateEnvVarsFromStruct(prefix, input)

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"context"
"fmt"

"knative.dev/eventing/pkg/reconciler/integration/source/resources"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -32,7 +34,6 @@ import (
"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/integrationsource"
v1listers "knative.dev/eventing/pkg/client/listers/sources/v1"
listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/integrationsource/resources"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"fmt"
Expand Down
133 changes: 133 additions & 0 deletions pkg/reconciler/integration/source/resources/containersource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/integration"
"knative.dev/pkg/kmeta"
)

const (
awsAccessKey = "aws.accessKey"
awsSecretKey = "aws.secretKey"
)

func NewContainerSource(source *v1alpha1.IntegrationSource) *sourcesv1.ContainerSource {
return &sourcesv1.ContainerSource{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(source),
},
Name: ContainerSourceName(source),
Namespace: source.Namespace,
},
Spec: sourcesv1.ContainerSourceSpec{

Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "source",
Image: selectImage(source),
ImagePullPolicy: corev1.PullIfNotPresent,
Env: makeEnv(source),
},
},
},
},
SourceSpec: source.Spec.SourceSpec,
},
}
}

// Function to create environment variables for Timer or AWS configurations dynamically
func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar {
var envVars = integration.MakeSSLEnvVar()

// Timer environment variables
if source.Spec.Timer != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...)
return envVars
}

// Handle secret name only if AWS is configured
var secretName string
if source.Spec.Aws != nil && source.Spec.Aws.Auth != nil && source.Spec.Aws.Auth.Secret != nil && source.Spec.Aws.Auth.Secret.Ref != nil {
secretName = source.Spec.Aws.Auth.Secret.Ref.Name
}

// AWS S3 environment variables
if source.Spec.Aws != nil && source.Spec.Aws.S3 != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName),
}...)
}
return envVars
}

// AWS SQS environment variables
if source.Spec.Aws != nil && source.Spec.Aws.SQS != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName),
}...)
}
return envVars
}

// AWS DynamoDB Streams environment variables
if source.Spec.Aws != nil && source.Spec.Aws.DDBStreams != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName),
}...)
}
return envVars
}

// If no valid configuration is found, return empty envVars
return envVars
}

func selectImage(source *v1alpha1.IntegrationSource) string {
if source.Spec.Timer != nil {
return "gcr.io/knative-nightly/timer-source:latest"
}
if source.Spec.Aws != nil {
if source.Spec.Aws.S3 != nil {
return "gcr.io/knative-nightly/aws-s3-source:latest"
}
if source.Spec.Aws.SQS != nil {
return "gcr.io/knative-nightly/aws-sqs-source:latest"
}
if source.Spec.Aws.DDBStreams != nil {
return "gcr.io/knative-nightly/aws-ddb-streams-source:latest"
}
}
return ""
}
Loading

0 comments on commit ebe99e6

Please sign in to comment.