Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multiple installations by not failing if all existing default resources exists #89

Merged
merged 9 commits into from
Jan 16, 2025
130 changes: 33 additions & 97 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package defaults

import (
"encoding/json"
"fmt"
"os"
"sync"

"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/blueprint"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"github.com/port-labs/port-k8s-exporter/pkg/port/page"
"github.com/port-labs/port-k8s-exporter/pkg/port/scorecards"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -132,81 +130,48 @@ func (e *AbortDefaultCreationError) Error() string {
return "AbortDefaultCreationError"
}

func validateResourcesErrors(createdBlueprints []string, createdPages []string, resourceErrors []error) *AbortDefaultCreationError {
if len(resourceErrors) > 0 {
for _, err := range resourceErrors {
klog.Infof("Failed to create resources: %v.", err.Error())
}
return &AbortDefaultCreationError{BlueprintsToRollback: createdBlueprints, PagesToRollback: createdPages, Errors: resourceErrors}
}
return nil
}

func validateResourcesDoesNotExist(portClient *cli.PortClient, defaults *Defaults, config *port.Config) *AbortDefaultCreationError {
var errors []error
if _, err := integration.GetIntegration(portClient, config.StateKey); err == nil {
klog.Warningf("Integration with state key %s already exists", config.StateKey)
return &AbortDefaultCreationError{Errors: []error{
fmt.Errorf("integration with state key %s already exists", config.StateKey),
}}
}

func createResources(portClient *cli.PortClient, defaults *Defaults) error {
existingBlueprints := []string{}
for _, bp := range defaults.Blueprints {
if _, err := blueprint.GetBlueprint(portClient, bp.Identifier); err == nil {
klog.Warningf("Blueprint with identifier %s already exists", bp.Identifier)
errors = append(errors, fmt.Errorf("blueprint with identifier %s already exists", bp.Identifier))
existingBlueprints = append(existingBlueprints, bp.Identifier)
}
}

for _, p := range defaults.Pages {
if _, err := page.GetPage(portClient, p.Identifier); err == nil {
klog.Warningf("Page with identifier %s already exists", p.Identifier)
errors = append(errors, fmt.Errorf("page with identifier %s already exists", p.Identifier))
}
}

if errors != nil {
return &AbortDefaultCreationError{Errors: errors}
}
return nil
}

func createResources(portClient *cli.PortClient, defaults *Defaults, config *port.Config) *AbortDefaultCreationError {
if err := validateResourcesDoesNotExist(portClient, defaults, config); err != nil {
klog.Warningf("Failed to create resources: %v.", err.Errors)
return err
if len(existingBlueprints) > 0 {
klog.Infof("Found existing blueprints: %v, skipping default resources creation", existingBlueprints)
return nil
}

bareBlueprints, patchStages := deconstructBlueprintsToCreationSteps(defaults.Blueprints)

waitGroup := sync.WaitGroup{}

var resourceErrors []error
var createdBlueprints []string
var createdPages []string
mutex := sync.Mutex{}
createdBlueprints := []string{}

for _, bp := range bareBlueprints {
waitGroup.Add(1)
go func(bp port.Blueprint) {
defer waitGroup.Done()
result, err := blueprint.NewBlueprint(portClient, bp)

mutex.Lock()
if err != nil {
klog.Warningf("Failed to create blueprint %s: %v", bp.Identifier, err.Error())
resourceErrors = append(resourceErrors, err)
} else {
klog.Infof("Created blueprint %s", result.Identifier)
createdBlueprints = append(createdBlueprints, result.Identifier)
createdBlueprints = append(createdBlueprints, bp.Identifier)
}
mutex.Unlock()
}(bp)
}
waitGroup.Wait()

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
if len(resourceErrors) > 0 {
return &AbortDefaultCreationError{
BlueprintsToRollback: createdBlueprints,
Errors: resourceErrors,
}
}

for _, patchStage := range patchStages {
Expand All @@ -215,16 +180,21 @@ func createResources(portClient *cli.PortClient, defaults *Defaults, config *por
go func(bp port.Blueprint) {
defer waitGroup.Done()
if _, err := blueprint.PatchBlueprint(portClient, bp); err != nil {
mutex.Lock()
matan84 marked this conversation as resolved.
Show resolved Hide resolved
klog.Warningf("Failed to patch blueprint %s: %v", bp.Identifier, err.Error())
resourceErrors = append(resourceErrors, err)
mutex.Unlock()
}
}(bp)
}
waitGroup.Wait()
}

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
if len(resourceErrors) > 0 {
return &AbortDefaultCreationError{
BlueprintsToRollback: createdBlueprints,
Errors: resourceErrors,
}
}
}

for _, blueprintScorecards := range defaults.Scorecards {
Expand All @@ -234,76 +204,42 @@ func createResources(portClient *cli.PortClient, defaults *Defaults, config *por
defer waitGroup.Done()
if err := scorecards.CreateScorecard(portClient, blueprintIdentifier, scorecard); err != nil {
klog.Warningf("Failed to create scorecard for blueprint %s: %v", blueprintIdentifier, err.Error())
resourceErrors = append(resourceErrors, err)
}
}(blueprintScorecards.Blueprint, scorecard)
}
}
waitGroup.Wait()

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
}

for _, pageToCreate := range defaults.Pages {
waitGroup.Add(1)
go func(p port.Page) {
defer waitGroup.Done()
if err := page.CreatePage(portClient, p); err != nil {
klog.Warningf("Failed to create page %s: %v", p.Identifier, err.Error())
resourceErrors = append(resourceErrors, err)
} else {
klog.Infof("Created page %s", p.Identifier)
createdPages = append(createdPages, p.Identifier)
}
}(pageToCreate)
}
waitGroup.Wait()

if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil {
return err
}

if err := integration.CreateIntegration(portClient, config.StateKey, config.EventListenerType, defaults.AppConfig); err != nil {
klog.Warningf("Failed to create integration with default configuration. state key %s: %v", config.StateKey, err.Error())
return &AbortDefaultCreationError{BlueprintsToRollback: createdBlueprints, PagesToRollback: createdPages, Errors: []error{err}}
omby8888 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

func initializeDefaults(portClient *cli.PortClient, config *port.Config) error {
defaults, err := getDefaults()
if err != nil {
return err
}

if err := createResources(portClient, defaults, config); err != nil {
klog.Infof("Failed to create resources. Rolling back blueprints: %v", err.BlueprintsToRollback)
var rollbackWg sync.WaitGroup
for _, identifier := range err.BlueprintsToRollback {
rollbackWg.Add(1)
go func(identifier string) {
defer rollbackWg.Done()
if err := blueprint.DeleteBlueprint(portClient, identifier); err != nil {
klog.Warningf("Failed to rollback blueprint %s creation: %v", identifier, err)
func initializeDefaults(portClient *cli.PortClient, defaults *Defaults) error {
if err := createResources(portClient, defaults); err != nil {
if abortErr, ok := err.(*AbortDefaultCreationError); ok {
klog.Warningf("Rolling back blueprints due to creation error")
for _, blueprintID := range abortErr.BlueprintsToRollback {
if err := blueprint.DeleteBlueprint(portClient, blueprintID); err != nil {
klog.Warningf("Failed to rollback blueprint %s: %v", blueprintID, err)
} else {
klog.Infof("Successfully rolled back blueprint %s", blueprintID)
}
}(identifier)
}
rollbackWg.Wait()

for _, identifier := range err.PagesToRollback {
rollbackWg.Add(1)
go func(identifier string) {
defer rollbackWg.Done()
if err := page.DeletePage(portClient, identifier); err != nil {
klog.Warningf("Failed to rollback page %s creation: %v", identifier, err)
}
}(identifier)
}
}
rollbackWg.Wait()

return &ExceptionGroup{Message: err.Error(), Errors: err.Errors}
klog.Warningf("Error creating default resources: %v", err)
return err
}

return nil
Expand Down
17 changes: 8 additions & 9 deletions pkg/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func Test_InitIntegration_BlueprintExists(t *testing.T) {
assert.Nil(t, e)

i, err := integration.GetIntegration(f.portClient, f.stateKey)
assert.Nil(t, i.Config.Resources)
assert.NotNil(t, i.Config.Resources)
assert.Nil(t, err)

_, err = blueprint.GetBlueprint(f.portClient, "workload")
Expand All @@ -157,13 +157,13 @@ func Test_InitIntegration_PageExists(t *testing.T) {
assert.Nil(t, e)

i, err := integration.GetIntegration(f.portClient, f.stateKey)
assert.Nil(t, i.Config.Resources)
assert.NotNil(t, i.Config.Resources)
assert.Nil(t, err)

_, err = page.GetPage(f.portClient, "workload_overview_dashboard")
assert.Nil(t, err)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_ExistingIntegration(t *testing.T) {
Expand All @@ -183,7 +183,7 @@ func Test_InitIntegration_ExistingIntegration(t *testing.T) {
_, err = integration.GetIntegration(f.portClient, f.stateKey)
assert.Nil(t, err)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_LocalResourcesConfiguration(t *testing.T) {
Expand Down Expand Up @@ -225,7 +225,7 @@ func Test_InitIntegration_LocalResourcesConfiguration(t *testing.T) {
assert.Equal(t, expectedResources, i.Config.Resources)
assert.Nil(t, err)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_EmptyConfiguration(t *testing.T) {
Expand All @@ -247,11 +247,12 @@ func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_EmptyC
assert.Nil(t, err)
assert.Equal(t, "KAFKA", i.EventListener.Type)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}

func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_WithConfiguration_WithOverwriteConfigurationOnRestartFlag(t *testing.T) {
f := NewFixture(t)
defer tearDownFixture(t, f)

expectedConfig := &port.IntegrationAppConfig{
Resources: []port.Resource{
Expand Down Expand Up @@ -294,7 +295,5 @@ func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_WithCo
assert.Nil(t, err)
assert.Equal(t, expectedConfig.Resources, i.Config.Resources)

testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
defer tearDownFixture(t, f)

testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{})
}
46 changes: 24 additions & 22 deletions pkg/defaults/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func getEventListenerConfig(eventListenerType string) *port.EventListenerSetting

func InitIntegration(portClient *cli.PortClient, applicationConfig *port.Config) error {
klog.Infof("Initializing Port integration")
defaults, err := getDefaults()
if err != nil {
return err
}

existingIntegration, err := integration.GetIntegration(portClient, applicationConfig.StateKey)
defaultIntegrationConfig := &port.IntegrationAppConfig{
Resources: applicationConfig.Resources,
Expand All @@ -28,39 +33,36 @@ func InitIntegration(portClient *cli.PortClient, applicationConfig *port.Config)
}

if err != nil {
klog.Infof("Could not get integration with state key %s, error: %s", applicationConfig.StateKey, err.Error())
klog.Infof("Creating integration")
// The exporter supports a deprecated case where resources are provided in config file and integration does not
// exist. If this is not the case, we support the new way of creating the integration with the default resources.
// Only one of the two cases can be true.
if defaultIntegrationConfig.Resources == nil && applicationConfig.CreateDefaultResources {
klog.Infof("Creating default resources")
if err := initializeDefaults(portClient, applicationConfig); err != nil {
klog.Warningf("Error initializing defaults: %s", err.Error())
klog.Warningf("The integration will start without default integration mapping and other default resources. Please create them manually in Port. ")
} else {
klog.Infof("Default resources created successfully")
return nil
}
if applicationConfig.CreateDefaultResources {
defaultIntegrationConfig = defaults.AppConfig
}

klog.Infof("Could not create default resources, creating integration with no resources")
klog.Infof("Creating integration with config: %v", defaultIntegrationConfig)
// Handle a deprecated case where resources are provided in config file
return integration.CreateIntegration(portClient, applicationConfig.StateKey, applicationConfig.EventListenerType, defaultIntegrationConfig)
klog.Warningf("Could not get integration with state key %s, error: %s", applicationConfig.StateKey, err.Error())
if err := integration.CreateIntegration(portClient, applicationConfig.StateKey, applicationConfig.EventListenerType, defaultIntegrationConfig); err != nil {
return err
}
} else {
klog.Infof("Integration with state key %s already exists, patching it", applicationConfig.StateKey)
integrationPatch := &port.Integration{
EventListener: getEventListenerConfig(applicationConfig.EventListenerType),
}

// Handle a deprecated case where resources are provided in config file and integration exists from previous
//versions without a config
if existingIntegration.Config == nil || applicationConfig.OverwriteConfigurationOnRestart {
klog.Infof("Integration exists without a config, patching it with default config: %v", defaultIntegrationConfig)
integrationPatch.Config = defaultIntegrationConfig
}

return integration.PatchIntegration(portClient, applicationConfig.StateKey, integrationPatch)
if err := integration.PatchIntegration(portClient, applicationConfig.StateKey, integrationPatch); err != nil {
return err
}
}

if applicationConfig.CreateDefaultResources {
klog.Infof("Creating default resources (blueprints, pages, etc..)")
if err := initializeDefaults(portClient, defaults); err != nil {
klog.Warningf("Error initializing defaults: %s", err.Error())
klog.Warningf("Some default resources may not have been created. The integration will continue running.")
}
}

return nil
}
Loading