diff --git a/pkg/defaults/defaults.go b/pkg/defaults/defaults.go index 87ceffb..65b0773 100644 --- a/pkg/defaults/defaults.go +++ b/pkg/defaults/defaults.go @@ -2,14 +2,12 @@ package defaults import ( "encoding/json" - "fmt" "os" "sync" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/blueprint" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" - "github.com/port-labs/port-k8s-exporter/pkg/port/integration" "github.com/port-labs/port-k8s-exporter/pkg/port/page" "github.com/port-labs/port-k8s-exporter/pkg/port/scorecards" "gopkg.in/yaml.v3" @@ -132,81 +130,48 @@ func (e *AbortDefaultCreationError) Error() string { return "AbortDefaultCreationError" } -func validateResourcesErrors(createdBlueprints []string, createdPages []string, resourceErrors []error) *AbortDefaultCreationError { - if len(resourceErrors) > 0 { - for _, err := range resourceErrors { - klog.Infof("Failed to create resources: %v.", err.Error()) - } - return &AbortDefaultCreationError{BlueprintsToRollback: createdBlueprints, PagesToRollback: createdPages, Errors: resourceErrors} - } - return nil -} - -func validateResourcesDoesNotExist(portClient *cli.PortClient, defaults *Defaults, config *port.Config) *AbortDefaultCreationError { - var errors []error - if _, err := integration.GetIntegration(portClient, config.StateKey); err == nil { - klog.Warningf("Integration with state key %s already exists", config.StateKey) - return &AbortDefaultCreationError{Errors: []error{ - fmt.Errorf("integration with state key %s already exists", config.StateKey), - }} - } - +func createResources(portClient *cli.PortClient, defaults *Defaults) error { + existingBlueprints := []string{} for _, bp := range defaults.Blueprints { if _, err := blueprint.GetBlueprint(portClient, bp.Identifier); err == nil { - klog.Warningf("Blueprint with identifier %s already exists", bp.Identifier) - errors = append(errors, fmt.Errorf("blueprint with identifier %s already exists", bp.Identifier)) + existingBlueprints = append(existingBlueprints, bp.Identifier) } } - for _, p := range defaults.Pages { - if _, err := page.GetPage(portClient, p.Identifier); err == nil { - klog.Warningf("Page with identifier %s already exists", p.Identifier) - errors = append(errors, fmt.Errorf("page with identifier %s already exists", p.Identifier)) - } - } - - if errors != nil { - return &AbortDefaultCreationError{Errors: errors} - } - return nil -} - -func createResources(portClient *cli.PortClient, defaults *Defaults, config *port.Config) *AbortDefaultCreationError { - if err := validateResourcesDoesNotExist(portClient, defaults, config); err != nil { - klog.Warningf("Failed to create resources: %v.", err.Errors) - return err + if len(existingBlueprints) > 0 { + klog.Infof("Found existing blueprints: %v, skipping default resources creation", existingBlueprints) + return nil } bareBlueprints, patchStages := deconstructBlueprintsToCreationSteps(defaults.Blueprints) - waitGroup := sync.WaitGroup{} - var resourceErrors []error - var createdBlueprints []string - var createdPages []string mutex := sync.Mutex{} + createdBlueprints := []string{} for _, bp := range bareBlueprints { waitGroup.Add(1) go func(bp port.Blueprint) { defer waitGroup.Done() result, err := blueprint.NewBlueprint(portClient, bp) - mutex.Lock() if err != nil { klog.Warningf("Failed to create blueprint %s: %v", bp.Identifier, err.Error()) resourceErrors = append(resourceErrors, err) } else { klog.Infof("Created blueprint %s", result.Identifier) - createdBlueprints = append(createdBlueprints, result.Identifier) + createdBlueprints = append(createdBlueprints, bp.Identifier) } mutex.Unlock() }(bp) } waitGroup.Wait() - if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil { - return err + if len(resourceErrors) > 0 { + return &AbortDefaultCreationError{ + BlueprintsToRollback: createdBlueprints, + Errors: resourceErrors, + } } for _, patchStage := range patchStages { @@ -215,16 +180,21 @@ func createResources(portClient *cli.PortClient, defaults *Defaults, config *por go func(bp port.Blueprint) { defer waitGroup.Done() if _, err := blueprint.PatchBlueprint(portClient, bp); err != nil { + mutex.Lock() klog.Warningf("Failed to patch blueprint %s: %v", bp.Identifier, err.Error()) resourceErrors = append(resourceErrors, err) + mutex.Unlock() } }(bp) } waitGroup.Wait() - } - if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil { - return err + if len(resourceErrors) > 0 { + return &AbortDefaultCreationError{ + BlueprintsToRollback: createdBlueprints, + Errors: resourceErrors, + } + } } for _, blueprintScorecards := range defaults.Scorecards { @@ -234,76 +204,42 @@ func createResources(portClient *cli.PortClient, defaults *Defaults, config *por defer waitGroup.Done() if err := scorecards.CreateScorecard(portClient, blueprintIdentifier, scorecard); err != nil { klog.Warningf("Failed to create scorecard for blueprint %s: %v", blueprintIdentifier, err.Error()) - resourceErrors = append(resourceErrors, err) } }(blueprintScorecards.Blueprint, scorecard) } } waitGroup.Wait() - if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil { - return err - } - for _, pageToCreate := range defaults.Pages { waitGroup.Add(1) go func(p port.Page) { defer waitGroup.Done() if err := page.CreatePage(portClient, p); err != nil { klog.Warningf("Failed to create page %s: %v", p.Identifier, err.Error()) - resourceErrors = append(resourceErrors, err) } else { klog.Infof("Created page %s", p.Identifier) - createdPages = append(createdPages, p.Identifier) } }(pageToCreate) } waitGroup.Wait() - if err := validateResourcesErrors(createdBlueprints, createdPages, resourceErrors); err != nil { - return err - } - - if err := integration.CreateIntegration(portClient, config.StateKey, config.EventListenerType, defaults.AppConfig); err != nil { - klog.Warningf("Failed to create integration with default configuration. state key %s: %v", config.StateKey, err.Error()) - return &AbortDefaultCreationError{BlueprintsToRollback: createdBlueprints, PagesToRollback: createdPages, Errors: []error{err}} - } - return nil } -func initializeDefaults(portClient *cli.PortClient, config *port.Config) error { - defaults, err := getDefaults() - if err != nil { - return err - } - - if err := createResources(portClient, defaults, config); err != nil { - klog.Infof("Failed to create resources. Rolling back blueprints: %v", err.BlueprintsToRollback) - var rollbackWg sync.WaitGroup - for _, identifier := range err.BlueprintsToRollback { - rollbackWg.Add(1) - go func(identifier string) { - defer rollbackWg.Done() - if err := blueprint.DeleteBlueprint(portClient, identifier); err != nil { - klog.Warningf("Failed to rollback blueprint %s creation: %v", identifier, err) +func initializeDefaults(portClient *cli.PortClient, defaults *Defaults) error { + if err := createResources(portClient, defaults); err != nil { + if abortErr, ok := err.(*AbortDefaultCreationError); ok { + klog.Warningf("Rolling back blueprints due to creation error") + for _, blueprintID := range abortErr.BlueprintsToRollback { + if err := blueprint.DeleteBlueprint(portClient, blueprintID); err != nil { + klog.Warningf("Failed to rollback blueprint %s: %v", blueprintID, err) + } else { + klog.Infof("Successfully rolled back blueprint %s", blueprintID) } - }(identifier) - } - rollbackWg.Wait() - - for _, identifier := range err.PagesToRollback { - rollbackWg.Add(1) - go func(identifier string) { - defer rollbackWg.Done() - if err := page.DeletePage(portClient, identifier); err != nil { - klog.Warningf("Failed to rollback page %s creation: %v", identifier, err) - } - }(identifier) + } } - rollbackWg.Wait() - - return &ExceptionGroup{Message: err.Error(), Errors: err.Errors} + klog.Warningf("Error creating default resources: %v", err) + return err } return nil diff --git a/pkg/defaults/defaults_test.go b/pkg/defaults/defaults_test.go index 6b0262c..e3513b6 100644 --- a/pkg/defaults/defaults_test.go +++ b/pkg/defaults/defaults_test.go @@ -131,7 +131,7 @@ func Test_InitIntegration_BlueprintExists(t *testing.T) { assert.Nil(t, e) i, err := integration.GetIntegration(f.portClient, f.stateKey) - assert.Nil(t, i.Config.Resources) + assert.NotNil(t, i.Config.Resources) assert.Nil(t, err) _, err = blueprint.GetBlueprint(f.portClient, "workload") @@ -157,13 +157,13 @@ func Test_InitIntegration_PageExists(t *testing.T) { assert.Nil(t, e) i, err := integration.GetIntegration(f.portClient, f.stateKey) - assert.Nil(t, i.Config.Resources) + assert.NotNil(t, i.Config.Resources) assert.Nil(t, err) _, err = page.GetPage(f.portClient, "workload_overview_dashboard") assert.Nil(t, err) - testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"availability_scorecard_dashboard"}, []string{}) + testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"availability_scorecard_dashboard"}, []string{}) } func Test_InitIntegration_ExistingIntegration(t *testing.T) { @@ -183,7 +183,7 @@ func Test_InitIntegration_ExistingIntegration(t *testing.T) { _, err = integration.GetIntegration(f.portClient, f.stateKey) assert.Nil(t, err) - testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) + testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) } func Test_InitIntegration_LocalResourcesConfiguration(t *testing.T) { @@ -225,7 +225,7 @@ func Test_InitIntegration_LocalResourcesConfiguration(t *testing.T) { assert.Equal(t, expectedResources, i.Config.Resources) assert.Nil(t, err) - testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) + testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) } func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_EmptyConfiguration(t *testing.T) { @@ -247,11 +247,12 @@ func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_EmptyC assert.Nil(t, err) assert.Equal(t, "KAFKA", i.EventListener.Type) - testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) + testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) } func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_WithConfiguration_WithOverwriteConfigurationOnRestartFlag(t *testing.T) { f := NewFixture(t) + defer tearDownFixture(t, f) expectedConfig := &port.IntegrationAppConfig{ Resources: []port.Resource{ @@ -294,7 +295,5 @@ func Test_InitIntegration_LocalResourcesConfiguration_ExistingIntegration_WithCo assert.Nil(t, err) assert.Equal(t, expectedConfig.Resources, i.Config.Resources) - testUtils.CheckResourcesExistence(false, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) - defer tearDownFixture(t, f) - + testUtils.CheckResourcesExistence(true, false, f.portClient, f.t, []string{"workload", "namespace", "cluster"}, []string{"workload_overview_dashboard", "availability_scorecard_dashboard"}, []string{}) } diff --git a/pkg/defaults/init.go b/pkg/defaults/init.go index 1f5f284..7c3a2c1 100644 --- a/pkg/defaults/init.go +++ b/pkg/defaults/init.go @@ -18,6 +18,11 @@ func getEventListenerConfig(eventListenerType string) *port.EventListenerSetting func InitIntegration(portClient *cli.PortClient, applicationConfig *port.Config) error { klog.Infof("Initializing Port integration") + defaults, err := getDefaults() + if err != nil { + return err + } + existingIntegration, err := integration.GetIntegration(portClient, applicationConfig.StateKey) defaultIntegrationConfig := &port.IntegrationAppConfig{ Resources: applicationConfig.Resources, @@ -28,39 +33,36 @@ func InitIntegration(portClient *cli.PortClient, applicationConfig *port.Config) } if err != nil { - klog.Infof("Could not get integration with state key %s, error: %s", applicationConfig.StateKey, err.Error()) - klog.Infof("Creating integration") - // The exporter supports a deprecated case where resources are provided in config file and integration does not - // exist. If this is not the case, we support the new way of creating the integration with the default resources. - // Only one of the two cases can be true. - if defaultIntegrationConfig.Resources == nil && applicationConfig.CreateDefaultResources { - klog.Infof("Creating default resources") - if err := initializeDefaults(portClient, applicationConfig); err != nil { - klog.Warningf("Error initializing defaults: %s", err.Error()) - klog.Warningf("The integration will start without default integration mapping and other default resources. Please create them manually in Port. ") - } else { - klog.Infof("Default resources created successfully") - return nil - } + if applicationConfig.CreateDefaultResources { + defaultIntegrationConfig = defaults.AppConfig } - klog.Infof("Could not create default resources, creating integration with no resources") - klog.Infof("Creating integration with config: %v", defaultIntegrationConfig) - // Handle a deprecated case where resources are provided in config file - return integration.CreateIntegration(portClient, applicationConfig.StateKey, applicationConfig.EventListenerType, defaultIntegrationConfig) + klog.Warningf("Could not get integration with state key %s, error: %s", applicationConfig.StateKey, err.Error()) + if err := integration.CreateIntegration(portClient, applicationConfig.StateKey, applicationConfig.EventListenerType, defaultIntegrationConfig); err != nil { + return err + } } else { klog.Infof("Integration with state key %s already exists, patching it", applicationConfig.StateKey) integrationPatch := &port.Integration{ EventListener: getEventListenerConfig(applicationConfig.EventListenerType), } - // Handle a deprecated case where resources are provided in config file and integration exists from previous - //versions without a config if existingIntegration.Config == nil || applicationConfig.OverwriteConfigurationOnRestart { - klog.Infof("Integration exists without a config, patching it with default config: %v", defaultIntegrationConfig) integrationPatch.Config = defaultIntegrationConfig } - return integration.PatchIntegration(portClient, applicationConfig.StateKey, integrationPatch) + if err := integration.PatchIntegration(portClient, applicationConfig.StateKey, integrationPatch); err != nil { + return err + } } + + if applicationConfig.CreateDefaultResources { + klog.Infof("Creating default resources (blueprints, pages, etc..)") + if err := initializeDefaults(portClient, defaults); err != nil { + klog.Warningf("Error initializing defaults: %s", err.Error()) + klog.Warningf("Some default resources may not have been created. The integration will continue running.") + } + } + + return nil }