diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index 408a0a595f..62ac99db63 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -62,6 +62,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/eventwatcher" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/notifier" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/statsreporter" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/trigger" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" @@ -350,7 +351,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { } // Make grpc clients to connect to plugins. - pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins)) + plugins := make([]plugin.Plugin, 0, len(cfg.Plugins)) options := []rpcclient.DialOption{ rpcclient.WithBlock(), rpcclient.WithInsecure(), @@ -359,8 +360,19 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...) if err != nil { input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err)) + return err } - pluginClis = append(pluginClis, cli) + + plugins = append(plugins, plugin.Plugin{ + Name: plg.Name, + Cli: cli, + }) + } + + pluginRegistry, err := plugin.NewPluginRegistry(ctx, plugins) + if err != nil { + input.Logger.Error("failed to create plugin registry", zap.Error(err)) + return err } // Start running application live state reporter. @@ -385,7 +397,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { c := controller.NewController( apiClient, gitClient, - pluginClis, + pluginRegistry, deploymentLister, commandLister, notifier, diff --git a/pkg/app/pipedv1/controller/controller.go b/pkg/app/pipedv1/controller/controller.go index 4e4b12f2fd..11d554856a 100644 --- a/pkg/app/pipedv1/controller/controller.go +++ b/pkg/app/pipedv1/controller/controller.go @@ -35,11 +35,10 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/git" "github.com/pipe-cd/pipecd/pkg/model" - pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" ) type apiClient interface { @@ -98,10 +97,9 @@ type controller struct { secretDecrypter secretDecrypter metadataStoreRegistry metadatastore.MetadataStoreRegistry - // gRPC clients to communicate with plugins. - pluginClients []pluginapi.PluginClient - // Map from stage name to the plugin client. - stageBasedPluginsMap map[string]pluginapi.PluginClient + // The registry of all plugins. + pluginRegistry plugin.PluginRegistry + // Map from application ID to the planner // of a pending deployment of that application. planners map[string]*planner @@ -133,7 +131,7 @@ type controller struct { func NewController( apiClient apiClient, gitClient gitClient, - pluginClients []pluginapi.PluginClient, + pluginRegistry plugin.PluginRegistry, deploymentLister deploymentLister, commandLister commandLister, notifier notifier, @@ -147,7 +145,7 @@ func NewController( return &controller{ apiClient: apiClient, gitClient: gitClient, - pluginClients: pluginClients, + pluginRegistry: pluginRegistry, deploymentLister: deploymentLister, commandLister: commandLister, notifier: notifier, @@ -183,23 +181,6 @@ func (c *controller) Run(ctx context.Context) error { c.workspaceDir = dir c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir)) - // Build the list of stages that can be handled by piped's plugins. - stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient) - for _, plugin := range c.pluginClients { - resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{}) - if err != nil { - return err - } - for _, stage := range resp.GetStages() { - if _, ok := stagesBasedPluginsMap[stage]; ok { - c.logger.Error("duplicated stage name", zap.String("stage", stage)) - return fmt.Errorf("duplicated stage name %s", stage) - } - stagesBasedPluginsMap[stage] = plugin - } - } - c.stageBasedPluginsMap = stagesBasedPluginsMap - ticker := time.NewTicker(c.syncInternal) defer ticker.Stop() c.logger.Info("start syncing planners and schedulers") @@ -452,8 +433,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) ( commitHash, configFilename, workingDir, - c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment. - c.stageBasedPluginsMap, + c.pluginRegistry, c.apiClient, c.gitClient, c.notifier, @@ -594,7 +574,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment) workingDir, c.apiClient, c.gitClient, - c.stageBasedPluginsMap, + c.pluginRegistry, c.notifier, c.secretDecrypter, c.logger, diff --git a/pkg/app/pipedv1/controller/planner.go b/pkg/app/pipedv1/controller/planner.go index d03d6cac20..1e274eb1d8 100644 --- a/pkg/app/pipedv1/controller/planner.go +++ b/pkg/app/pipedv1/controller/planner.go @@ -30,6 +30,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" @@ -56,13 +57,6 @@ type planner struct { lastSuccessfulConfigFilename string workingDir string - // The plugin clients are used to call plugin that actually - // performs planning deployment. - plugins []pluginapi.PluginClient - // The map used to know which plugin is incharged for a given stage - // of the current deployment. - stageBasedPluginsMap map[string]pluginapi.PluginClient - // The apiClient is used to report the deployment status. apiClient apiClient @@ -76,6 +70,9 @@ type planner struct { // which encrypted using PipeCD built-in secret management. secretDecrypter secretDecrypter + // The pluginRegistry is used to determine which plugins to be used + pluginRegistry plugin.PluginRegistry + logger *zap.Logger tracer trace.Tracer @@ -93,8 +90,7 @@ func newPlanner( lastSuccessfulCommitHash string, lastSuccessfulConfigFilename string, workingDir string, - pluginClients []pluginapi.PluginClient, - stageBasedPluginsMap map[string]pluginapi.PluginClient, + pluginRegistry plugin.PluginRegistry, apiClient apiClient, gitClient gitClient, notifier notifier, @@ -116,8 +112,7 @@ func newPlanner( lastSuccessfulCommitHash: lastSuccessfulCommitHash, lastSuccessfulConfigFilename: lastSuccessfulConfigFilename, workingDir: workingDir, - stageBasedPluginsMap: stageBasedPluginsMap, - plugins: pluginClients, + pluginRegistry: pluginRegistry, apiClient: apiClient, gitClient: gitClient, notifier: notifier, @@ -262,8 +257,21 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment TargetDeploymentSource: targetDS, } + cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig()) + if err != nil { + p.logger.Error("unable to parse application config", zap.Error(err)) + return nil, err + } + spec := cfg.Spec + + plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(spec) + if err != nil { + p.logger.Error("unable to get plugin clients by app config", zap.Error(err)) + return nil, err + } + // Build deployment target versions. - for _, plg := range p.plugins { + for _, plg := range plugins { vRes, err := plg.DetermineVersions(ctx, &deployment.DetermineVersionsRequest{Input: input}) if err != nil { p.logger.Warn("unable to determine versions", zap.Error(err)) @@ -280,13 +288,6 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment } } - cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig()) - if err != nil { - p.logger.Error("unable to parse application config", zap.Error(err)) - return nil, err - } - spec := cfg.Spec - // In case the strategy has been decided by trigger. // For example: user triggered the deployment via web console. switch p.deployment.Trigger.SyncStrategy { @@ -371,7 +372,7 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment summary string ) // Build plan based on plugins determined strategy - for _, plg := range p.plugins { + for _, plg := range plugins { res, err := plg.DetermineStrategy(ctx, &deployment.DetermineStrategyRequest{Input: input}) if err != nil { p.logger.Warn("Unable to determine strategy using current plugin", zap.Error(err)) @@ -417,10 +418,17 @@ func (p *planner) buildQuickSyncStages(ctx context.Context, cfg *config.GenericA rollbackStages = []*model.PipelineStage{} rollback = *cfg.Planner.AutoRollback ) - for _, plg := range p.plugins { + + plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(cfg) + if err != nil { + p.logger.Error("failed to get plugin clients by app config", zap.Error(err)) + return nil, err + } + for _, plg := range plugins { res, err := plg.BuildQuickSyncStages(ctx, &deployment.BuildQuickSyncStagesRequest{Rollback: rollback}) if err != nil { - return nil, fmt.Errorf("failed to build quick sync stage deployment (%w)", err) + p.logger.Error("failed to build quick sync stages for deployment", zap.Error(err)) + return nil, err } for i := range res.Stages { if res.Stages[i].Rollback { @@ -458,9 +466,10 @@ func (p *planner) buildPipelineSyncStages(ctx context.Context, cfg *config.Gener // Build stages config for each plugin. for i := range stagesCfg { stageCfg := stagesCfg[i] - plg, ok := p.stageBasedPluginsMap[stageCfg.Name.String()] - if !ok { - return nil, fmt.Errorf("unable to find plugin for stage %q", stageCfg.Name.String()) + plg, err := p.pluginRegistry.GetPluginClientByStageName(stageCfg.Name.String()) + if err != nil { + p.logger.Error("failed to get plugin client by stage name", zap.Error(err)) + return nil, err } stagesCfgPerPlugin[plg] = append(stagesCfgPerPlugin[plg], &deployment.BuildPipelineSyncStagesRequest_StageConfig{ @@ -480,7 +489,8 @@ func (p *planner) buildPipelineSyncStages(ctx context.Context, cfg *config.Gener Rollback: rollback, }) if err != nil { - return nil, fmt.Errorf("failed to build pipeline sync stages for deployment (%w)", err) + p.logger.Error("failed to build pipeline sync stages for deployment", zap.Error(err)) + return nil, err } // TODO: Ensure responsed stages indexies is valid. for i := range res.Stages { diff --git a/pkg/app/pipedv1/controller/planner_test.go b/pkg/app/pipedv1/controller/planner_test.go index 9fb2e97eb6..86720d9273 100644 --- a/pkg/app/pipedv1/controller/planner_test.go +++ b/pkg/app/pipedv1/controller/planner_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" @@ -37,6 +38,7 @@ type fakePlugin struct { quickStages []*model.PipelineStage pipelineStages []*model.PipelineStage rollbackStages []*model.PipelineStage + stageStatusMap map[string]model.StageStatus } func (p *fakePlugin) Close() error { return nil } @@ -97,7 +99,18 @@ func (p *fakePlugin) FetchDefinedStages(ctx context.Context, req *deployment.Fet Stages: stages, }, nil } +func (p *fakePlugin) ExecuteStage(ctx context.Context, req *deployment.ExecuteStageRequest, opts ...grpc.CallOption) (*deployment.ExecuteStageResponse, error) { + status, ok := p.stageStatusMap[req.Input.Stage.Id] + if !ok { + return &deployment.ExecuteStageResponse{ + Status: model.StageStatus_STAGE_NOT_STARTED_YET, + }, nil + } + return &deployment.ExecuteStageResponse{ + Status: status, + }, nil +} func pointerBool(b bool) *bool { return &b } @@ -107,139 +120,190 @@ func TestBuildQuickSyncStages(t *testing.T) { testcases := []struct { name string - plugins []pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry cfg *config.GenericApplicationSpec wantErr bool expectedStages []*model.PipelineStage }{ { name: "only one plugin", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Rollback: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1"}, }, wantErr: false, expectedStages: []*model.PipelineStage{ { - Id: "plugin-1-stage-1", + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", }, { Id: "plugin-1-rollback", + Name: "plugin-1-rollback", Rollback: true, }, }, }, { name: "multi plugins", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Rollback: true, - }, - }, - }, - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-2-rollback", - Rollback: true, + { + Name: "plugin-2", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-2-rollback", + Name: "plugin-2-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1", "plugin-2"}, }, wantErr: false, expectedStages: []*model.PipelineStage{ { - Id: "plugin-1-stage-1", + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", }, { - Id: "plugin-2-stage-1", + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", }, { Id: "plugin-1-rollback", + Name: "plugin-1-rollback", Rollback: true, }, { Id: "plugin-2-rollback", + Name: "plugin-2-rollback", Rollback: true, }, }, }, { name: "multi plugins - no rollback", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Rollback: true, - }, - }, - }, - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-2-rollback", - Rollback: true, + { + Name: "plugin-2", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-2-rollback", + Name: "plugin-2-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(false), }, + Plugins: []string{"plugin-1", "plugin-2"}, }, wantErr: false, expectedStages: []*model.PipelineStage{ { - Id: "plugin-1-stage-1", + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", }, { - Id: "plugin-2-stage-1", + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", }, }, }, @@ -248,7 +312,7 @@ func TestBuildQuickSyncStages(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { planner := &planner{ - plugins: tc.plugins, + pluginRegistry: tc.pluginRegistry, } stages, err := planner.buildQuickSyncStages(context.TODO(), tc.cfg) require.Equal(t, tc.wantErr, err != nil) @@ -262,37 +326,45 @@ func TestBuildPipelineSyncStages(t *testing.T) { testcases := []struct { name string - plugins []pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry cfg *config.GenericApplicationSpec wantErr bool expectedStages []*model.PipelineStage }{ { name: "only one plugin", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Index: 0, - Name: "plugin-1-stage-1", - }, - { - Id: "plugin-1-stage-2", - Index: 1, - Name: "plugin-1-stage-2", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Index: 0, - Name: "plugin-1-rollback", - Rollback: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + }, + { + Id: "plugin-1-stage-2", + Index: 1, + Name: "plugin-1-stage-2", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Index: 0, + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -333,44 +405,60 @@ func TestBuildPipelineSyncStages(t *testing.T) { }, { name: "multi plugins single rollback", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - }, - { - Id: "plugin-1-stage-2", - Name: "plugin-1-stage-2", - }, - { - Id: "plugin-1-stage-3", - Name: "plugin-1-stage-3", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Index: 0, - Name: "plugin-1-rollback", - Rollback: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + }, + { + Id: "plugin-1-stage-2", + Index: 1, + Name: "plugin-1-stage-2", + }, + { + Id: "plugin-1-stage-3", + Index: 2, + Name: "plugin-1-stage-3", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Index: 0, + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - }, - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", - Name: "plugin-2-stage-1", - }, - { - Id: "plugin-2-stage-2", - Name: "plugin-2-stage-2", + { + Name: "plugin-2", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Index: 0, + Name: "plugin-2-stage-1", + }, + { + Id: "plugin-2-stage-2", + Index: 1, + Name: "plugin-2-stage-2", + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -441,48 +529,63 @@ func TestBuildPipelineSyncStages(t *testing.T) { }, { name: "multi plugins multi rollback", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - }, - { - Id: "plugin-1-stage-2", - Name: "plugin-1-stage-2", - }, - { - Id: "plugin-1-stage-3", - Name: "plugin-1-stage-3", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Index: 0, - Name: "plugin-1-rollback", - Rollback: true, - }, - }, - }, - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", - Name: "plugin-2-stage-1", + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + }, + { + Id: "plugin-1-stage-2", + Index: 1, + Name: "plugin-1-stage-2", + }, + { + Id: "plugin-1-stage-3", + Index: 2, + Name: "plugin-1-stage-3", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Index: 0, + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-2-rollback", - Index: 2, - Name: "plugin-2-rollback", - Rollback: true, + { + Name: "plugin-2", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Index: 0, + Name: "plugin-2-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-2-rollback", + Index: 2, + Name: "plugin-2-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -551,16 +654,8 @@ func TestBuildPipelineSyncStages(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - stageBasedPluginMap := make(map[string]pluginapi.PluginClient) - for _, p := range tc.plugins { - stages, _ := p.FetchDefinedStages(context.TODO(), &deployment.FetchDefinedStagesRequest{}) - for _, s := range stages.Stages { - stageBasedPluginMap[s] = p - } - } planner := &planner{ - plugins: tc.plugins, - stageBasedPluginsMap: stageBasedPluginMap, + pluginRegistry: tc.pluginRegistry, } stages, err := planner.buildPipelineSyncStages(context.TODO(), tc.cfg) require.Equal(t, tc.wantErr, err != nil) @@ -576,6 +671,7 @@ func TestPlanner_BuildPlan(t *testing.T) { name string isFirstDeploy bool plugins []pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry cfg *config.GenericApplicationSpec deployment *model.Deployment wantErr bool @@ -584,20 +680,30 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "quick sync strategy triggered by web console", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1"}, }, deployment: &model.Deployment{ Trigger: &model.DeploymentTrigger{ @@ -612,6 +718,7 @@ func TestPlanner_BuildPlan(t *testing.T) { Stages: []*model.PipelineStage{ { Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", Visible: true, }, }, @@ -626,17 +733,26 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "pipeline sync strategy triggered by web console", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -679,20 +795,30 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "quick sync due to no pipeline configured", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1"}, }, deployment: &model.Deployment{ Trigger: &model.DeploymentTrigger{}, @@ -704,6 +830,7 @@ func TestPlanner_BuildPlan(t *testing.T) { Stages: []*model.PipelineStage{ { Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", Visible: true, }, }, @@ -718,17 +845,26 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "pipeline sync due to alwaysUsePipeline", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AlwaysUsePipeline: true, @@ -769,16 +905,25 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "quick sync due to first deployment", isFirstDeploy: true, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -802,6 +947,7 @@ func TestPlanner_BuildPlan(t *testing.T) { Stages: []*model.PipelineStage{ { Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", Visible: true, }, }, @@ -837,6 +983,36 @@ func TestPlanner_BuildPlan(t *testing.T) { }, }, }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + syncStrategy: &deployment.DetermineStrategyResponse{ + SyncStrategy: model.SyncStrategy_PIPELINE, + Summary: "determined by plugin", + }, + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + Visible: true, + }, + }, + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-quick-stage-1", + Visible: true, + }, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -877,16 +1053,8 @@ func TestPlanner_BuildPlan(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - stageBasedPluginMap := make(map[string]pluginapi.PluginClient) - for _, p := range tc.plugins { - stages, _ := p.FetchDefinedStages(context.TODO(), &deployment.FetchDefinedStagesRequest{}) - for _, s := range stages.Stages { - stageBasedPluginMap[s] = p - } - } planner := &planner{ - plugins: tc.plugins, - stageBasedPluginsMap: stageBasedPluginMap, + pluginRegistry: tc.pluginRegistry, deployment: tc.deployment, lastSuccessfulCommitHash: "", lastSuccessfulConfigFilename: "", diff --git a/pkg/app/pipedv1/controller/scheduler.go b/pkg/app/pipedv1/controller/scheduler.go index 6d58da570e..2ec97c6096 100644 --- a/pkg/app/pipedv1/controller/scheduler.go +++ b/pkg/app/pipedv1/controller/scheduler.go @@ -30,10 +30,10 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" - pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" ) @@ -42,7 +42,7 @@ type scheduler struct { deployment *model.Deployment workingDir string - stageBasedPluginsMap map[string]pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry apiClient apiClient gitClient gitClient @@ -77,7 +77,7 @@ func newScheduler( workingDir string, apiClient apiClient, gitClient gitClient, - stageBasedPluginsMap map[string]pluginapi.PluginClient, + pluginRegistry plugin.PluginRegistry, notifier notifier, secretsDecrypter secretDecrypter, logger *zap.Logger, @@ -94,9 +94,9 @@ func newScheduler( s := &scheduler{ deployment: d, workingDir: workingDir, - stageBasedPluginsMap: stageBasedPluginsMap, apiClient: apiClient, gitClient: gitClient, + pluginRegistry: pluginRegistry, notifier: notifier, secretDecrypter: secretsDecrypter, doneDeploymentStatus: d.Status, @@ -510,9 +510,9 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final } // Find the executor plugin for this stage. - plugin, ok := s.stageBasedPluginsMap[ps.Name] - if !ok { - s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name)) + plugin, err := s.pluginRegistry.GetPluginClientByStageName(ps.Name) + if err != nil { + s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name), zap.Error(err)) s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires) return model.StageStatus_STAGE_FAILURE } diff --git a/pkg/app/pipedv1/controller/scheduler_test.go b/pkg/app/pipedv1/controller/scheduler_test.go index b73bd92f7e..5d8fed4acf 100644 --- a/pkg/app/pipedv1/controller/scheduler_test.go +++ b/pkg/app/pipedv1/controller/scheduler_test.go @@ -21,10 +21,12 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" "google.golang.org/grpc" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" @@ -215,9 +217,27 @@ func TestExecuteStage(t *testing.T) { apiClient: &fakeAPIClient{}, targetDSP: &fakeDeploySourceProvider{}, runningDSP: &fakeDeploySourceProvider{}, - stageBasedPluginsMap: map[string]pluginapi.PluginClient{ - "stage-name": &fakeExecutorPluginClient{}, - }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "stage-name", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "stage-id", + Name: "stage-name", + }, + }, + stageStatusMap: map[string]model.StageStatus{ + "stage-id": model.StageStatus_STAGE_SUCCESS, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), genericApplicationConfig: &config.GenericApplicationSpec{ Pipeline: &config.DeploymentPipeline{ Stages: []config.PipelineStage{ @@ -257,9 +277,27 @@ func TestExecuteStage_SignalTerminated(t *testing.T) { apiClient: &fakeAPIClient{}, targetDSP: &fakeDeploySourceProvider{}, runningDSP: &fakeDeploySourceProvider{}, - stageBasedPluginsMap: map[string]pluginapi.PluginClient{ - "stage-name": &fakeExecutorPluginClient{}, - }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "stage-name", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "stage-id", + Name: "stage-name", + }, + }, + stageStatusMap: map[string]model.StageStatus{ + "stage-id": model.StageStatus_STAGE_SUCCESS, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), genericApplicationConfig: &config.GenericApplicationSpec{ Pipeline: &config.DeploymentPipeline{ Stages: []config.PipelineStage{ @@ -295,9 +333,27 @@ func TestExecuteStage_SignalCancelled(t *testing.T) { apiClient: &fakeAPIClient{}, targetDSP: &fakeDeploySourceProvider{}, runningDSP: &fakeDeploySourceProvider{}, - stageBasedPluginsMap: map[string]pluginapi.PluginClient{ - "stage-name": &fakeExecutorPluginClient{}, - }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "stage-name", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "stage-id", + Name: "stage-name", + }, + }, + stageStatusMap: map[string]model.StageStatus{ + "stage-id": model.StageStatus_STAGE_SUCCESS, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), genericApplicationConfig: &config.GenericApplicationSpec{ Pipeline: &config.DeploymentPipeline{ Stages: []config.PipelineStage{ diff --git a/pkg/app/pipedv1/plugin/kubernetes/config/application.go b/pkg/app/pipedv1/plugin/kubernetes/config/application.go index c2c3acaad9..794d5e18b3 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/config/application.go +++ b/pkg/app/pipedv1/plugin/kubernetes/config/application.go @@ -36,6 +36,9 @@ type KubernetesApplicationSpec struct { // Input for Kubernetes deployment such as kubectl version, helm version, manifests filter... Input KubernetesDeploymentInput `json:"input"` + // Configuration for quick sync. + QuickSync K8sSyncStageOptions `json:"quickSync"` + // Which resources should be considered as the Workload of application. // Empty means all Deployments. // e.g. @@ -100,6 +103,14 @@ type KubernetesDeployTargetConfig struct { KubectlVersion string `json:"kubectlVersion"` } +// K8sSyncStageOptions contains all configurable values for a K8S_SYNC stage. +type K8sSyncStageOptions struct { + // Whether the PRIMARY variant label should be added to manifests if they were missing. + AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"` + // Whether the resources that are no longer defined in Git should be removed or not. + Prune bool `json:"prune"` +} + // FindDeployTarget finds the deploy target configuration by the given name. func FindDeployTarget(cfg *config.PipedPlugin, name string) (KubernetesDeployTargetConfig, error) { if cfg == nil { diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/annotate.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/annotate.go index b9357ea2c7..007c843db9 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/annotate.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/annotate.go @@ -32,11 +32,11 @@ func annotateConfigHash(manifests []provider.Manifest) error { secrets := make(map[string]provider.Manifest) for _, m := range manifests { if m.IsConfigMap() { - configMaps[m.Key().Name()] = m + configMaps[m.Name()] = m continue } if m.IsSecret() { - secrets[m.Key().Name()] = m + secrets[m.Name()] = m } } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go index fd34ed65a0..9833c30ff4 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go @@ -73,10 +73,10 @@ func determineVersions(manifests []provider.Manifest) ([]*model.ArtifactVersion, func findManifests(kind, name string, manifests []provider.Manifest) []provider.Manifest { out := make([]provider.Manifest, 0, len(manifests)) for _, m := range manifests { - if m.Key().Kind() != kind { + if m.Kind() != kind { continue } - if name != "" && m.Key().Name() != name { + if name != "" && m.Name() != name { continue } out = append(out, m) @@ -186,7 +186,7 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s if msg, changed := checkImageChange(templateDiffs); changed { return model.SyncStrategy_PIPELINE, msg } - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Key().Name()) + return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Name()) } } @@ -203,14 +203,14 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s for k, oc := range oldConfigs { nc, ok := newConfigs[k] if !ok { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Key().Kind(), oc.Key().Name()) + return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Kind(), oc.Name()) } result, err := provider.Diff(oc, nc, logger) if err != nil { return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err) } if result.HasDiff() { - return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Key().Kind(), oc.Key().Name()) + return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Kind(), oc.Name()) } } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go index 02bbdb0173..11a6f51324 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server.go @@ -17,6 +17,8 @@ package deployment import ( "cmp" "context" + "errors" + "fmt" "time" kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" @@ -266,6 +268,9 @@ func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersi // Add variant annotations to all manifests. for i := range manifests { + manifests[i].AddLabels(map[string]string{ + variantLabel: primaryVariant, + }) manifests[i].AddAnnotations(map[string]string{ variantLabel: primaryVariant, }) @@ -290,8 +295,11 @@ func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersi return model.StageStatus_STAGE_FAILURE } + // Create the kubectl wrapper for the target cluster. + kubectl := provider.NewKubectl(kubectlPath) + // Create the applier for the target cluster. - applier := provider.NewApplier(provider.NewKubectl(kubectlPath), cfg.Spec.Input, deployTargetConfig, a.logger) + applier := provider.NewApplier(kubectl, cfg.Spec.Input, deployTargetConfig, a.logger) // Start applying all manifests to add or update running resources. if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil { @@ -299,8 +307,73 @@ func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersi return model.StageStatus_STAGE_FAILURE } - // TODO: implement prune resources + // TODO: treat the stage options specified under "with" + if !cfg.Spec.QuickSync.Prune { + lp.Info("Resource GC was skipped because sync.prune was not configured") + return model.StageStatus_STAGE_SUCCESS + } + + // Wait for all applied manifests to be stable. + // In theory, we don't need to wait for them to be stable before going to the next step + // but waiting for a while reduces the number of Kubernetes changes in a short time. + lp.Info("Waiting for the applied manifests to be stable") + select { + case <-time.After(15 * time.Second): + break + case <-ctx.Done(): + break + } + + lp.Info("Start finding all running resources but no longer defined in Git") + + namespacedLiveResources, err := kubectl.GetAll(ctx, deployTargetConfig.KubeConfigPath, + "", + fmt.Sprintf("%s=%s", provider.LabelManagedBy, provider.ManagedByPiped), + fmt.Sprintf("%s=%s", provider.LabelApplication, input.GetDeployment().GetApplicationId()), + ) + if err != nil { + lp.Errorf("Failed while listing all resources (%v)", err) + return model.StageStatus_STAGE_FAILURE + } + + clusterScopedLiveResources, err := kubectl.GetAllClusterScoped(ctx, deployTargetConfig.KubeConfigPath, + fmt.Sprintf("%s=%s", provider.LabelManagedBy, provider.ManagedByPiped), + fmt.Sprintf("%s=%s", provider.LabelApplication, input.GetDeployment().GetApplicationId()), + ) + if err != nil { + lp.Errorf("Failed while listing all cluster-scoped resources (%v)", err) + return model.StageStatus_STAGE_FAILURE + } + + if len(namespacedLiveResources)+len(clusterScopedLiveResources) == 0 { + lp.Info("There is no data about live resource so no resource will be removed") + return model.StageStatus_STAGE_SUCCESS + } + + lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterScopedLiveResources)) + + removeKeys := provider.FindRemoveResources(manifests, namespacedLiveResources, clusterScopedLiveResources) + if len(removeKeys) == 0 { + lp.Info("There are no live resources should be removed") + return model.StageStatus_STAGE_SUCCESS + } + + lp.Infof("Start pruning %d resources", len(removeKeys)) + var deletedCount int + for _, key := range removeKeys { + if err := kubectl.Delete(ctx, deployTargetConfig.KubeConfigPath, key.Namespace(), key); err != nil { + if errors.Is(err, provider.ErrNotFound) { + lp.Infof("Specified resource does not exist, so skip deleting the resource: %s (%v)", key.ReadableString(), err) + continue + } + lp.Errorf("Failed while deleting resource %s (%v)", key.ReadableString(), err) + continue // continue to delete other resources + } + deletedCount++ + lp.Successf("- deleted resource: %s", key.ReadableString()) + } + lp.Successf("Successfully deleted %d resources", deletedCount) return model.StageStatus_STAGE_SUCCESS } diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go b/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go index 4eb49d17f7..fa4af44c92 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/server_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -136,6 +137,8 @@ func setupTestPluginConfigAndDynamicClient(t *testing.T) (*config.PipedPlugin, d } func TestDeploymentService_executeK8sSyncStage(t *testing.T) { + t.Parallel() + ctx := context.Background() // read the application config from the example file @@ -183,16 +186,24 @@ func TestDeploymentService_executeK8sSyncStage(t *testing.T) { assert.Equal(t, "simple", deployment.GetName()) assert.Equal(t, "simple", deployment.GetLabels()["app"]) + + assert.Equal(t, "piped", deployment.GetLabels()["pipecd.dev/managed-by"]) + assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) + assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) + assert.Equal(t, "0123456789", deployment.GetLabels()["pipecd.dev/commit-hash"]) + assert.Equal(t, "piped", deployment.GetAnnotations()["pipecd.dev/managed-by"]) assert.Equal(t, "piped-id", deployment.GetAnnotations()["pipecd.dev/piped"]) assert.Equal(t, "app-id", deployment.GetAnnotations()["pipecd.dev/application"]) assert.Equal(t, "apps/v1", deployment.GetAnnotations()["pipecd.dev/original-api-version"]) - assert.Equal(t, "apps/v1:Deployment::simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. + assert.Equal(t, "apps:Deployment::simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"]) } func TestDeploymentService_executeK8sSyncStage_withInputNamespace(t *testing.T) { + t.Parallel() + ctx := context.Background() // read the application config from the example file @@ -246,12 +257,411 @@ func TestDeploymentService_executeK8sSyncStage_withInputNamespace(t *testing.T) deployment, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}).Namespace("test-namespace").Get(context.Background(), "simple", metav1.GetOptions{}) require.NoError(t, err) + assert.Equal(t, "piped", deployment.GetLabels()["pipecd.dev/managed-by"]) + assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) + assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) + assert.Equal(t, "0123456789", deployment.GetLabels()["pipecd.dev/commit-hash"]) + assert.Equal(t, "simple", deployment.GetName()) assert.Equal(t, "simple", deployment.GetLabels()["app"]) assert.Equal(t, "piped", deployment.GetAnnotations()["pipecd.dev/managed-by"]) assert.Equal(t, "piped-id", deployment.GetAnnotations()["pipecd.dev/piped"]) assert.Equal(t, "app-id", deployment.GetAnnotations()["pipecd.dev/application"]) assert.Equal(t, "apps/v1", deployment.GetAnnotations()["pipecd.dev/original-api-version"]) - assert.Equal(t, "apps/v1:Deployment::simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. + assert.Equal(t, "apps:Deployment:test-namespace:simple", deployment.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. assert.Equal(t, "0123456789", deployment.GetAnnotations()["pipecd.dev/commit-hash"]) } + +func TestDeploymentService_executeK8sSyncStage_withPrune(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // initialize tool registry + testRegistry, err := toolregistrytest.NewToolRegistry(t) + require.NoError(t, err) + + // initialize plugin config and dynamic client for assertions with envtest + pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) + + svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) + + running := filepath.Join("./", "testdata", "prune", "running") + + // read the running application config from the testdata file + runningCfg, err := os.ReadFile(filepath.Join(running, "app.pipecd.yaml")) + require.NoError(t, err) + + ok := t.Run("prepare", func(t *testing.T) { + runningRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: nil, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, runningRequest) + + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + + service, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("default").Get(context.Background(), "simple", metav1.GetOptions{}) + require.NoError(t, err) + + require.Equal(t, "piped", service.GetLabels()["pipecd.dev/managed-by"]) + require.Equal(t, "piped-id", service.GetLabels()["pipecd.dev/piped"]) + require.Equal(t, "app-id", service.GetLabels()["pipecd.dev/application"]) + require.Equal(t, "0123456789", service.GetLabels()["pipecd.dev/commit-hash"]) + + require.Equal(t, "simple", service.GetName()) + require.Equal(t, "piped", service.GetAnnotations()["pipecd.dev/managed-by"]) + require.Equal(t, "piped-id", service.GetAnnotations()["pipecd.dev/piped"]) + require.Equal(t, "app-id", service.GetAnnotations()["pipecd.dev/application"]) + require.Equal(t, "v1", service.GetAnnotations()["pipecd.dev/original-api-version"]) + require.Equal(t, ":Service::simple", service.GetAnnotations()["pipecd.dev/resource-key"]) // This assertion differs from the non-plugin-arched piped's Kubernetes platform provider, but we decided to change this behavior. + require.Equal(t, "0123456789", service.GetAnnotations()["pipecd.dev/commit-hash"]) + }) + require.Truef(t, ok, "expected prepare to succeed") + + t.Run("run with prune", func(t *testing.T) { + + // prepare the request to ensure the running deployment exists + + target := filepath.Join("./", "testdata", "prune", "target") + + // read the running application config from the testdata file + targetCfg, err := os.ReadFile(filepath.Join(target, "app.pipecd.yaml")) + require.NoError(t, err) + + // prepare the request to ensure the running deployment exists + targetRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: target, + CommitHash: "0012345678", + ApplicationConfig: targetCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, targetRequest) + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("default").Get(context.Background(), "simple", metav1.GetOptions{}) + require.Error(t, err) + require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) + }) +} + +func TestDeploymentService_executeK8sSyncStage_withPrune_changesNamespace(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // initialize tool registry + testRegistry, err := toolregistrytest.NewToolRegistry(t) + require.NoError(t, err) + + // initialize plugin config and dynamic client for assertions with envtest + pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) + + svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) + + running := filepath.Join("./", "testdata", "prune_with_change_namespace", "running") + + // read the running application config from the example file + runningCfg, err := os.ReadFile(filepath.Join(running, "app.pipecd.yaml")) + require.NoError(t, err) + + ok := t.Run("prepare", func(t *testing.T) { + // prepare the request to ensure the running deployment exists + runningRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: nil, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, runningRequest) + + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + + service, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("test-1").Get(context.Background(), "simple", metav1.GetOptions{}) + require.NoError(t, err) + + require.Equal(t, "piped", service.GetLabels()["pipecd.dev/managed-by"]) + require.Equal(t, "piped-id", service.GetLabels()["pipecd.dev/piped"]) + require.Equal(t, "app-id", service.GetLabels()["pipecd.dev/application"]) + require.Equal(t, "0123456789", service.GetLabels()["pipecd.dev/commit-hash"]) + + require.Equal(t, "simple", service.GetName()) + require.Equal(t, "piped", service.GetAnnotations()["pipecd.dev/managed-by"]) + require.Equal(t, "piped-id", service.GetAnnotations()["pipecd.dev/piped"]) + require.Equal(t, "app-id", service.GetAnnotations()["pipecd.dev/application"]) + require.Equal(t, "v1", service.GetAnnotations()["pipecd.dev/original-api-version"]) + require.Equal(t, "0123456789", service.GetAnnotations()["pipecd.dev/commit-hash"]) + require.Equal(t, ":Service:test-1:simple", service.GetAnnotations()["pipecd.dev/resource-key"]) + }) + require.Truef(t, ok, "expected prepare to succeed") + + t.Run("run with prune", func(t *testing.T) { + target := filepath.Join("./", "testdata", "prune_with_change_namespace", "target") + + // read the running application config from the example file + targetCfg, err := os.ReadFile(filepath.Join(target, "app.pipecd.yaml")) + require.NoError(t, err) + + // prepare the request to ensure the running deployment exists + targetRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: target, + CommitHash: "0012345678", + ApplicationConfig: targetCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, targetRequest) + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + + // The service should be removed from the previous namespace + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("test-1").Get(context.Background(), "simple", metav1.GetOptions{}) + require.Error(t, err) + require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) + + // The service should be created in the new namespace + service, err := dynamicClient.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}).Namespace("test-2").Get(context.Background(), "simple", metav1.GetOptions{}) + require.NoError(t, err) + + require.Equal(t, "piped", service.GetLabels()["pipecd.dev/managed-by"]) + require.Equal(t, "piped-id", service.GetLabels()["pipecd.dev/piped"]) + require.Equal(t, "app-id", service.GetLabels()["pipecd.dev/application"]) + require.Equal(t, "0012345678", service.GetLabels()["pipecd.dev/commit-hash"]) + + require.Equal(t, "simple", service.GetName()) + require.Equal(t, "piped", service.GetAnnotations()["pipecd.dev/managed-by"]) + require.Equal(t, "piped-id", service.GetAnnotations()["pipecd.dev/piped"]) + require.Equal(t, "app-id", service.GetAnnotations()["pipecd.dev/application"]) + require.Equal(t, "v1", service.GetAnnotations()["pipecd.dev/original-api-version"]) + require.Equal(t, "0012345678", service.GetAnnotations()["pipecd.dev/commit-hash"]) + require.Equal(t, ":Service:test-2:simple", service.GetAnnotations()["pipecd.dev/resource-key"]) + }) +} + +func TestDeploymentService_executeK8sSyncStage_withPrune_clusterScoped(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // initialize tool registry + testRegistry, err := toolregistrytest.NewToolRegistry(t) + require.NoError(t, err) + + // initialize plugin config and dynamic client for assertions with envtest + pluginCfg, dynamicClient := setupTestPluginConfigAndDynamicClient(t) + + svc := NewDeploymentService(pluginCfg, zaptest.NewLogger(t), testRegistry, logpersistertest.NewTestLogPersister(t)) + + // prepare the custom resource definition + prepare := filepath.Join("./", "testdata", "prune_cluster_scoped_resource", "prepare") + + prepareCfg, err := os.ReadFile(filepath.Join(prepare, "app.pipecd.yaml")) + require.NoError(t, err) + + ok := t.Run("prepare crd", func(t *testing.T) { + prepareRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "prepare-app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: nil, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: prepare, + CommitHash: "0123456789", + ApplicationConfig: prepareCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, prepareRequest) + + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + }) + require.Truef(t, ok, "expected prepare to succeed") + + // prepare the running resources + running := filepath.Join("./", "testdata", "prune_cluster_scoped_resource", "running") + + // read the running application config from the example file + runningCfg, err := os.ReadFile(filepath.Join(running, "app.pipecd.yaml")) + require.NoError(t, err) + + ok = t.Run("prepare running", func(t *testing.T) { + // prepare the request to ensure the running deployment exists + runningRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: nil, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, runningRequest) + + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + + // The my-new-cron-object/my-new-cron-object-2/my-new-cron-object-v1beta1 should be created + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object", metav1.GetOptions{}) + require.NoError(t, err) + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-2", metav1.GetOptions{}) + require.NoError(t, err) + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-v1beta1", metav1.GetOptions{}) + require.NoError(t, err) + }) + require.Truef(t, ok, "expected prepare to succeed") + + t.Run("sync", func(t *testing.T) { + // sync the target resources and assert the prune behavior + target := filepath.Join("./", "testdata", "prune_cluster_scoped_resource", "target") + + // read the running application config from the example file + targetCfg, err := os.ReadFile(filepath.Join(target, "app.pipecd.yaml")) + require.NoError(t, err) + + // prepare the request to ensure the running deployment exists + targetRequest := &deployment.ExecuteStageRequest{ + Input: &deployment.ExecutePluginInput{ + Deployment: &model.Deployment{ + PipedId: "piped-id", + ApplicationId: "app-id", + DeployTargets: []string{"default"}, + }, + Stage: &model.PipelineStage{ + Id: "stage-id", + Name: "K8S_SYNC", + }, + StageConfig: []byte(``), + RunningDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + TargetDeploymentSource: &deployment.DeploymentSource{ + ApplicationDirectory: target, + CommitHash: "0012345678", + ApplicationConfig: targetCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + }, + } + + resp, err := svc.ExecuteStage(ctx, targetRequest) + require.NoError(t, err) + require.Equal(t, model.StageStatus_STAGE_SUCCESS.String(), resp.GetStatus().String()) + + // The my-new-cron-object should not be removed + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object", metav1.GetOptions{}) + require.NoError(t, err) + // The my-new-cron-object-2 should be removed + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-2", metav1.GetOptions{}) + require.Error(t, err) + require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) + // The my-new-cron-object-v1beta1 should be removed + _, err = dynamicClient.Resource(schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}).Get(context.Background(), "my-new-cron-object-v1beta1", metav1.GetOptions{}) + require.Error(t, err) + require.Truef(t, apierrors.IsNotFound(err), "expected error to be NotFound, but got %v", err) + }) +} diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/app.pipecd.yaml new file mode 100644 index 0000000000..257d9c46d7 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/app.pipecd.yaml @@ -0,0 +1,18 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + quickSync: + prune: true + input: + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.31.0 + description: | + This app demonstrates how to deploy a Kubernetes application with [Quick Sync](https://pipecd.dev/docs/concepts/#sync-strategy) strategy.\ + No pipeline is specified then in each deployment PipeCD will roll out the new version and switch all traffic to it immediately.\ + References: [adding a new app](https://pipecd.dev/docs/user-guide/managing-application/adding-an-application/), [app configuration](https://pipecd.dev/docs/user-guide/configuration-reference/) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/deployment.yaml new file mode 100644 index 0000000000..f1e0d4b29a --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/deployment.yaml @@ -0,0 +1,27 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/service.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/service.yaml new file mode 100644 index 0000000000..52ca9d1f59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/running/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/target/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/target/app.pipecd.yaml new file mode 100644 index 0000000000..d9140b2e81 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/target/app.pipecd.yaml @@ -0,0 +1,17 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + quickSync: + prune: true + input: + manifests: + - deployment.yaml + kubectlVersion: 1.31.0 + description: | + This app demonstrates how to deploy a Kubernetes application with [Quick Sync](https://pipecd.dev/docs/concepts/#sync-strategy) strategy.\ + No pipeline is specified then in each deployment PipeCD will roll out the new version and switch all traffic to it immediately.\ + References: [adding a new app](https://pipecd.dev/docs/user-guide/managing-application/adding-an-application/), [app configuration](https://pipecd.dev/docs/user-guide/configuration-reference/) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/target/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/target/deployment.yaml new file mode 100644 index 0000000000..f1e0d4b29a --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune/target/deployment.yaml @@ -0,0 +1,27 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/prepare/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/prepare/app.pipecd.yaml new file mode 100644 index 0000000000..1a151ec49b --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/prepare/app.pipecd.yaml @@ -0,0 +1,10 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: crd + quickSync: + prune: false + input: + manifests: + - crd.yaml + kubectlVersion: 1.31.0 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/prepare/crd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/prepare/crd.yaml new file mode 100644 index 0000000000..32f7df0ce0 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/prepare/crd.yaml @@ -0,0 +1,57 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + # name must match the spec fields below, and be in the form: . + name: crontabs.stable.example.com +spec: + # group name to use for REST API: /apis// + group: stable.example.com + # list of versions supported by this CustomResourceDefinition + versions: + - name: v1 + # Each version can be enabled/disabled by Served flag. + served: true + # One and only one version must be marked as the storage version. + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + cronSpec: + type: string + image: + type: string + replicas: + - name: v1beta1 + # Each version can be enabled/disabled by Served flag. + served: true + # One and only one version must be marked as the storage version. + storage: false + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + cronSpec: + type: string + image: + type: string + replicas: + type: integer + # either Namespaced or Cluster + scope: Cluster + names: + # plural name to be used in the URL: /apis/// + plural: crontabs + # singular name to be used as an alias on the CLI and for display + singular: crontab + # kind is normally the CamelCased singular type. Your resource manifests use this. + kind: CronTab + # shortNames allow shorter string to match your resource on the CLI + shortNames: + - ct diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/app.pipecd.yaml new file mode 100644 index 0000000000..f93b4e51b9 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/app.pipecd.yaml @@ -0,0 +1,14 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: namespace + quickSync: + prune: true + input: + autoCreateNamespace: true + namespace: test-1 + manifests: + - crontab.yaml + - crontab-2.yaml + - crontab-v1beta1.yaml + kubectlVersion: 1.31.0 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab-2.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab-2.yaml new file mode 100644 index 0000000000..8bc6589555 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab-2.yaml @@ -0,0 +1,7 @@ +apiVersion: "stable.example.com/v1" +kind: CronTab +metadata: + name: my-new-cron-object-2 +spec: + cronSpec: "* * * * */5" + image: my-awesome-cron-image diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab-v1beta1.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab-v1beta1.yaml new file mode 100644 index 0000000000..4aec6c08df --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab-v1beta1.yaml @@ -0,0 +1,7 @@ +apiVersion: "stable.example.com/v1beta1" +kind: CronTab +metadata: + name: my-new-cron-object-v1beta1 +spec: + cronSpec: "* * * * */5" + image: my-awesome-cron-image diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab.yaml new file mode 100644 index 0000000000..6e66452e55 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/running/crontab.yaml @@ -0,0 +1,7 @@ +apiVersion: "stable.example.com/v1" +kind: CronTab +metadata: + name: my-new-cron-object +spec: + cronSpec: "* * * * */5" + image: my-awesome-cron-image diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/target/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/target/app.pipecd.yaml new file mode 100644 index 0000000000..a867e2dd23 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/target/app.pipecd.yaml @@ -0,0 +1,12 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: namespace + quickSync: + prune: true + input: + autoCreateNamespace: true + namespace: test-1 + manifests: + - crontab.yaml + kubectlVersion: 1.31.0 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/target/crontab.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/target/crontab.yaml new file mode 100644 index 0000000000..6e66452e55 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_cluster_scoped_resource/target/crontab.yaml @@ -0,0 +1,7 @@ +apiVersion: "stable.example.com/v1" +kind: CronTab +metadata: + name: my-new-cron-object +spec: + cronSpec: "* * * * */5" + image: my-awesome-cron-image diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/app.pipecd.yaml new file mode 100644 index 0000000000..092b75bc56 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/app.pipecd.yaml @@ -0,0 +1,20 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + quickSync: + prune: true + input: + autoCreateNamespace: true + namespace: test-1 + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.31.0 + description: | + This app demonstrates how to deploy a Kubernetes application with [Quick Sync](https://pipecd.dev/docs/concepts/#sync-strategy) strategy.\ + No pipeline is specified then in each deployment PipeCD will roll out the new version and switch all traffic to it immediately.\ + References: [adding a new app](https://pipecd.dev/docs/user-guide/managing-application/adding-an-application/), [app configuration](https://pipecd.dev/docs/user-guide/configuration-reference/) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/deployment.yaml new file mode 100644 index 0000000000..f1e0d4b29a --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/deployment.yaml @@ -0,0 +1,27 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/service.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/service.yaml new file mode 100644 index 0000000000..52ca9d1f59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/running/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/app.pipecd.yaml new file mode 100644 index 0000000000..c25555fb66 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/app.pipecd.yaml @@ -0,0 +1,20 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + quickSync: + prune: true + input: + autoCreateNamespace: true + namespace: test-2 + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.31.0 + description: | + This app demonstrates how to deploy a Kubernetes application with [Quick Sync](https://pipecd.dev/docs/concepts/#sync-strategy) strategy.\ + No pipeline is specified then in each deployment PipeCD will roll out the new version and switch all traffic to it immediately.\ + References: [adding a new app](https://pipecd.dev/docs/user-guide/managing-application/adding-an-application/), [app configuration](https://pipecd.dev/docs/user-guide/configuration-reference/) diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/deployment.yaml new file mode 100644 index 0000000000..f1e0d4b29a --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/deployment.yaml @@ -0,0 +1,27 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/service.yaml b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/service.yaml new file mode 100644 index 0000000000..52ca9d1f59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/deployment/testdata/prune_with_change_namespace/target/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/applier.go b/pkg/app/pipedv1/plugin/kubernetes/provider/applier.go index 7f1bee6100..9f80c9edfc 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/applier.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/applier.go @@ -144,7 +144,7 @@ func (a *Applier) Delete(ctx context.Context, k ResourceKey) (err error) { return err } - if k.String() != m.body.GetAnnotations()[LabelResourceKey] { + if k.String() != m.Key().String() { return ErrNotFound } diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/applier_test.go b/pkg/app/pipedv1/plugin/kubernetes/provider/applier_test.go index d095b15e9c..d05e4bb7fc 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/applier_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/applier_test.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config" ) @@ -430,13 +431,12 @@ kind: ConfigMap metadata: name: test-config annotations: - pipecd.dev/resource-key: "v1:ConfigMap::test-config" + pipecd.dev/resource-key: ":ConfigMap::test-config" `, resourceKey: ResourceKey{ - apiVersion: "v1", - kind: "ConfigMap", - namespace: "", - name: "test-config", + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "", + name: "test-config", }, expectedErr: nil, }, @@ -449,13 +449,12 @@ kind: ConfigMap metadata: name: test-config annotations: - pipecd.dev/resource-key: "v1:ConfigMap::test-config" + pipecd.dev/resource-key: ":ConfigMap::test-config" `, resourceKey: ResourceKey{ - apiVersion: "v1", - kind: "ConfigMap", - namespace: "", - name: "test-config", + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "", + name: "test-config", }, expectedErr: errGet, }, @@ -468,13 +467,12 @@ kind: ConfigMap metadata: name: test-config annotations: - pipecd.dev/resource-key: "v1:ConfigMap::test-config" + pipecd.dev/resource-key: ":ConfigMap::test-config" `, resourceKey: ResourceKey{ - apiVersion: "v1", - kind: "ConfigMap", - namespace: "", - name: "test-config", + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "", + name: "test-config", }, expectedErr: errDelete, }, @@ -486,13 +484,12 @@ kind: ConfigMap metadata: name: test-config annotations: - pipecd.dev/resource-key: "v1:ConfigMap::test-config" + pipecd.dev/resource-key: ":ConfigMap::test-config" `, resourceKey: ResourceKey{ - apiVersion: "v1", - kind: "ConfigMap", - namespace: "", - name: "another-config", + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "", + name: "another-config", }, expectedErr: ErrNotFound, }, @@ -501,6 +498,41 @@ metadata: manifest: ` apiVersion: v1 kind: ConfigMap +metadata: + name: test-config + namespace: test-namespace + annotations: + pipecd.dev/resource-key: ":ConfigMap:test-namespace:test-config" +`, + resourceKey: ResourceKey{ + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "test-namespace", + name: "test-config", + }, + expectedErr: nil, + }, + { + name: "successful delete with old format of resource key", + manifest: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-config + annotations: + pipecd.dev/resource-key: "v1:ConfigMap::test-config" +`, + resourceKey: ResourceKey{ + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "", + name: "test-config", + }, + expectedErr: nil, + }, + { + name: "successful delete with namespace with old format of resource key", + manifest: ` +apiVersion: v1 +kind: ConfigMap metadata: name: test-config namespace: test-namespace @@ -508,10 +540,9 @@ metadata: pipecd.dev/resource-key: "v1:ConfigMap:test-namespace:test-config" `, resourceKey: ResourceKey{ - apiVersion: "v1", - kind: "ConfigMap", - namespace: "test-namespace", - name: "test-config", + groupKind: schema.ParseGroupKind("ConfigMap"), + namespace: "test-namespace", + name: "test-config", }, expectedErr: nil, }, diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/helm_test.go b/pkg/app/pipedv1/plugin/kubernetes/provider/helm_test.go index a6b4528638..476a5bfa2e 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/helm_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/helm_test.go @@ -84,7 +84,6 @@ func TestTemplateLocalChart_WithNamespace(t *testing.T) { metadata, _, err := manifest.NestedMap("metadata") require.NoError(t, err) require.Equal(t, namespace, metadata["namespace"]) - require.Equal(t, namespace, manifest.Key().Namespace()) } } diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/kubectl.go b/pkg/app/pipedv1/plugin/kubernetes/provider/kubectl.go index f3b16f0941..76b9efb236 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/kubectl.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/kubectl.go @@ -21,6 +21,9 @@ import ( "fmt" "os/exec" "strings" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" ) var ( @@ -230,6 +233,111 @@ func (c *Kubectl) Get(ctx context.Context, kubeconfig, namespace string, r Resou return ms[0], nil } +// getClusterScopedAPIResources retrieves the list of available API resources from the Kubernetes cluster. +// It runs the `kubectl api-resources` command with the specified kubeconfig and returns the +// names of the resources that support the "list", "get", and "delete" verbs, and are cluster-scoped. +func (c *Kubectl) getClusterScopedAPIResources(ctx context.Context, kubeconfig string) ([]string, error) { + args := []string{"api-resources", "--namespaced=false", "--verbs=list,get,delete", "--output=name"} + if kubeconfig != "" { + args = append(args, "--kubeconfig", kubeconfig) + } + cmd := exec.CommandContext(ctx, c.execPath, args...) + out, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to get API resources: %s, %v", string(out), err) + } + lines := strings.Split(string(out), "\n") + resources := make([]string, 0, len(lines)) + for _, line := range lines { + if line != "" { + resources = append(resources, line) + } + } + + return resources, nil +} + +// GetAllClusterScoped retrieves all cluster-scoped resources from the Kubernetes cluster +// using the provided kubeconfig and optional selectors. It returns a slice of Manifests +// representing the resources or an error if the operation fails. +func (c *Kubectl) GetAllClusterScoped(ctx context.Context, kubeconfig string, selector ...string) ([]Manifest, error) { + resources, err := c.getClusterScopedAPIResources(ctx, kubeconfig) + if err != nil { + return nil, err + } + + args := make([]string, 0, 7) + if kubeconfig != "" { + args = append(args, "--kubeconfig", kubeconfig) + } + args = append(args, "get", strings.Join(resources, ","), "-o", "yaml", "--selector", strings.Join(selector, ",")) + cmd := exec.CommandContext(ctx, c.execPath, args...) + out, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to get cluster-scoped resources: %s, %v", string(out), err) + } + + // Unmarshal the output to the list of manifests. + var list v1.List + if err := yaml.Unmarshal(out, &list); err != nil { + return nil, fmt.Errorf("failed to unmarshal the output: %w", err) + } + + ms := make([]Manifest, 0, len(list.Items)) + for _, item := range list.Items { + m, err := ParseManifests(string(item.Raw)) + if err != nil { + return nil, fmt.Errorf("failed to parse the manifest: %w", err) + } + ms = append(ms, m...) + } + + return ms, nil +} + +// GetAll retrieves all Kubernetes resources in the specified namespace and matching the given selector. +// It returns a list of manifests or an error if the retrieval or unmarshalling fails. +// If no resources are found, it returns nil without an error. +func (c *Kubectl) GetAll(ctx context.Context, kubeconfig, namespace string, selector ...string) (ms []Manifest, err error) { + args := make([]string, 0, 7) + args = append(args, "get", "all", "-o", "yaml", "--selector", strings.Join(selector, ",")) + if kubeconfig != "" { + args = append(args, "--kubeconfig", kubeconfig) + } + if namespace == "" { + args = append(args, "--all-namespaces") + } else { + args = append(args, "--namespace", namespace) + } + cmd := exec.CommandContext(ctx, c.execPath, args...) + out, err := cmd.CombinedOutput() + + if err != nil { + return nil, fmt.Errorf("failed to get: %s, %w", string(out), err) + } + if strings.Contains(string(out), "(NotFound)") { + // No resources found. Return nil. This is not an error. + return nil, nil + } + + // Unmarshal the output to the list of manifests. + var list v1.List + if err := yaml.Unmarshal(out, &list); err != nil { + return nil, fmt.Errorf("failed to unmarshal the output: %w", err) + } + + ms = make([]Manifest, 0, len(list.Items)) + for _, item := range list.Items { + m, err := ParseManifests(string(item.Raw)) + if err != nil { + return nil, fmt.Errorf("failed to parse the manifest: %w", err) + } + ms = append(ms, m...) + } + + return ms, nil +} + // CreateNamespace runs kubectl create namespace with the given namespace. func (c *Kubectl) CreateNamespace(ctx context.Context, kubeconfig, namespace string) (err error) { // TODO: record the metrics for the kubectl create namespace command. diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/loader.go b/pkg/app/pipedv1/plugin/kubernetes/provider/loader.go index 8cac2b9965..9b1134cbb8 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/loader.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/loader.go @@ -85,13 +85,26 @@ func NewLoader(registry ToolRegistry) *Loader { func (l *Loader) LoadManifests(ctx context.Context, input LoaderInput) (manifests []Manifest, err error) { defer func() { - // Add builtin annotations for tracking application live state. for i := range manifests { + // Set the namespace for all manifests if the namespace is not specified in the manifest, + // we have to do this to ensure that the namespace of loaded manifests are consistent with the applied resources. + if input.Namespace != "" { + manifests[i].body.SetNamespace(input.Namespace) + } + + // Add builtin labels and annotations for tracking application live state. + manifests[i].AddLabels(map[string]string{ + LabelManagedBy: ManagedByPiped, + LabelPiped: input.PipedID, + LabelApplication: input.AppID, + LabelCommitHash: input.CommitHash, + }) + manifests[i].AddAnnotations(map[string]string{ LabelManagedBy: ManagedByPiped, LabelPiped: input.PipedID, LabelApplication: input.AppID, - LabelOriginalAPIVersion: manifests[i].Key().APIVersion(), + LabelOriginalAPIVersion: manifests[i].body.GetAPIVersion(), LabelResourceKey: manifests[i].Key().String(), LabelCommitHash: input.CommitHash, }) diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/manifest.go b/pkg/app/pipedv1/plugin/kubernetes/provider/manifest.go index d964c332eb..bff04fbc07 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/manifest.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/manifest.go @@ -59,6 +59,14 @@ func (m Manifest) Key() ResourceKey { return makeResourceKey(m.body) } +func (m Manifest) Kind() string { + return m.body.GetKind() +} + +func (m Manifest) Name() string { + return m.body.GetName() +} + // IsDeployment returns true if the manifest is a Deployment. // It checks the API group and the kind of the manifest. func (m Manifest) IsDeployment() bool { @@ -115,6 +123,22 @@ func (m Manifest) NestedMap(fields ...string) (map[string]any, bool, error) { return unstructured.NestedMap(m.body.Object, fields...) } +func (m Manifest) AddLabels(labels map[string]string) { + if len(labels) == 0 { + return + } + + lbs := m.body.GetLabels() + if lbs == nil { + m.body.SetLabels(labels) + return + } + for k, v := range labels { + lbs[k] = v + } + m.body.SetLabels(lbs) +} + func (m Manifest) AddAnnotations(annotations map[string]string) { if len(annotations) == 0 { return @@ -131,6 +155,11 @@ func (m Manifest) AddAnnotations(annotations map[string]string) { m.body.SetAnnotations(annos) } +// IsManagedByPiped returns true if the manifest is managed by Piped. +func (m Manifest) IsManagedByPiped() bool { + return len(m.body.GetOwnerReferences()) == 0 && m.body.GetAnnotations()[LabelManagedBy] == ManagedByPiped +} + // AddStringMapValues adds or overrides the given key-values into the string map // that can be found at the specified fields. func (m Manifest) AddStringMapValues(values map[string]string, fields ...string) error { @@ -170,20 +199,12 @@ type WorkloadPair struct { func FindSameManifests(olds, news []Manifest) []WorkloadPair { pairs := make([]WorkloadPair, 0) oldMap := make(map[ResourceKey]Manifest, len(olds)) - nomalizeKey := func(k ResourceKey) ResourceKey { - // Ignoring APIVersion because user can upgrade to the new APIVersion for the same workload. - k.apiVersion = "" - if k.namespace == DefaultNamespace { - k.namespace = "" - } - return k - } for _, m := range olds { - key := nomalizeKey(m.Key()) + key := m.Key().normalize() oldMap[key] = m } for _, n := range news { - key := nomalizeKey(n.Key()) + key := n.Key().normalize() if o, ok := oldMap[key]; ok { pairs = append(pairs, WorkloadPair{ Old: o, diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/manifest_test.go b/pkg/app/pipedv1/plugin/kubernetes/provider/manifest_test.go index fa01c1261c..a1de84bbd7 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/manifest_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/manifest_test.go @@ -21,6 +21,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" ) func TestManifest_AddStringMapValues(t *testing.T) { @@ -155,10 +156,9 @@ spec: }, want: map[ResourceKey]Manifest{ { - apiVersion: "v1", - kind: "ConfigMap", - name: "my-config", - namespace: "default", + groupKind: schema.ParseGroupKind("ConfigMap"), + name: "my-config", + namespace: "default", }: mustParseManifests(t, ` apiVersion: v1 kind: ConfigMap @@ -169,10 +169,9 @@ data: key: value `)[0], { - apiVersion: "v1", - kind: "Secret", - name: "my-secret", - namespace: "default", + groupKind: schema.ParseGroupKind("Secret"), + name: "my-secret", + namespace: "default", }: mustParseManifests(t, ` apiVersion: v1 kind: Secret @@ -217,10 +216,9 @@ data: }, want: map[ResourceKey]Manifest{ { - apiVersion: "v1", - kind: "ConfigMap", - name: "my-config", - namespace: "default", + groupKind: schema.ParseGroupKind("ConfigMap"), + name: "my-config", + namespace: "default", }: mustParseManifests(t, ` apiVersion: v1 kind: ConfigMap @@ -247,10 +245,9 @@ data: }, want: map[ResourceKey]Manifest{ { - apiVersion: "v1", - kind: "Secret", - name: "my-secret", - namespace: "default", + groupKind: schema.ParseGroupKind("Secret"), + name: "my-secret", + namespace: "default", }: mustParseManifests(t, ` apiVersion: v1 kind: Secret @@ -286,10 +283,9 @@ data: }, want: map[ResourceKey]Manifest{ { - apiVersion: "v1", - kind: "ConfigMap", - name: "my-config", - namespace: "custom-namespace", + groupKind: schema.ParseGroupKind("ConfigMap"), + name: "my-config", + namespace: "custom-namespace", }: mustParseManifests(t, ` apiVersion: v1 kind: ConfigMap @@ -300,10 +296,9 @@ data: key: value `)[0], { - apiVersion: "v1", - kind: "Secret", - name: "my-secret", - namespace: "custom-namespace", + groupKind: schema.ParseGroupKind("Secret"), + name: "my-secret", + namespace: "custom-namespace", }: mustParseManifests(t, ` apiVersion: v1 kind: Secret @@ -723,3 +718,71 @@ data: }) } } + +func TestIsManagedByPiped(t *testing.T) { + testcases := []struct { + name string + manifest Manifest + wantResult bool + }{ + { + name: "managed by Piped", + manifest: Manifest{ + body: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + LabelManagedBy: ManagedByPiped, + }, + }, + }, + }, + }, + wantResult: true, + }, + { + name: "not managed by Piped", + manifest: Manifest{ + body: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + "some-other-label": "some-value", + }, + }, + }, + }, + }, + wantResult: false, + }, + { + name: "has owner references", + manifest: Manifest{ + body: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + LabelManagedBy: ManagedByPiped, + }, + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "v1", + "kind": "ReplicaSet", + "name": "example-replicaset", + }, + }, + }, + }, + }, + }, + wantResult: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + gotResult := tc.manifest.IsManagedByPiped() + assert.Equal(t, tc.wantResult, gotResult) + }) + } +} diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/resource.go b/pkg/app/pipedv1/plugin/kubernetes/provider/resource.go index 05780ee75f..b11164f52e 100644 --- a/pkg/app/pipedv1/plugin/kubernetes/provider/resource.go +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/resource.go @@ -16,8 +16,10 @@ package provider import ( "fmt" + "strings" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" ) const ( @@ -28,43 +30,102 @@ const ( DefaultNamespace = "default" ) +// ResourceKey represents a unique key of a Kubernetes resource. +// We use GroupKind, namespace, and name to identify a resource. type ResourceKey struct { - apiVersion string - kind string - namespace string - name string + // We use GroupKind instead of GroupVersionKind because we don't care about the version. + groupKind schema.GroupKind + // The namespace of the resource. + // We use namespace as a part of the key to identify a resource + // We have to distinguish the namespaces to prune the old resource when users change the namespace of a resource. + // If the resource is cluster-scoped, this field should be empty. + namespace string + // The name of the resource. + name string } -func (k ResourceKey) APIVersion() string { - return k.apiVersion +func (k ResourceKey) Kind() string { + return k.groupKind.Kind } -func (k ResourceKey) Kind() string { - return k.kind +func (k ResourceKey) Name() string { + return k.name } func (k ResourceKey) Namespace() string { return k.namespace } -func (k ResourceKey) Name() string { - return k.name +// normalize converts the group and kind to lower case. +// It also converts the default namespace to an empty string. +func (k ResourceKey) normalize() ResourceKey { + k.groupKind.Group = strings.ToLower(k.groupKind.Group) + k.groupKind.Kind = strings.ToLower(k.groupKind.Kind) + return k.normalizeNamespace() +} + +// normalizeNamespace converts the default namespace to an empty string. +func (k ResourceKey) normalizeNamespace() ResourceKey { + if k.namespace == DefaultNamespace { + return k.withoutNamespace() + } + return k +} + +func (k ResourceKey) withoutNamespace() ResourceKey { + k.namespace = "" + return k } func (k ResourceKey) String() string { - return fmt.Sprintf("%s:%s:%s:%s", k.apiVersion, k.kind, k.namespace, k.name) + return fmt.Sprintf("%s:%s:%s:%s", k.groupKind.Group, k.groupKind.Kind, k.namespace, k.name) } func (k ResourceKey) ReadableString() string { - return fmt.Sprintf("name=%q, kind=%q, namespace=%q, apiVersion=%q", k.name, k.kind, k.namespace, k.apiVersion) + return fmt.Sprintf("name=%q, kind=%q, namespace=%q, apiGroup=%q", k.name, k.groupKind.Kind, k.namespace, k.groupKind.Group) } func makeResourceKey(obj *unstructured.Unstructured) ResourceKey { k := ResourceKey{ - apiVersion: obj.GetAPIVersion(), - kind: obj.GetKind(), - namespace: obj.GetNamespace(), - name: obj.GetName(), + groupKind: obj.GroupVersionKind().GroupKind(), + namespace: obj.GetNamespace(), + name: obj.GetName(), } return k } + +// FindRemoveResources identifies resources that are present in the live state but not in the desired manifests. +func FindRemoveResources(manifests, namespacedLiveResources, clusterScopedLiveResources []Manifest) []ResourceKey { + var ( + removeKeys = make([]ResourceKey, 0, len(namespacedLiveResources)+len(clusterScopedLiveResources)) + ) + + { + normalizedKeys := make(map[ResourceKey]struct{}, len(manifests)) + for _, m := range manifests { + normalizedKeys[m.Key().normalize()] = struct{}{} + } + + for _, r := range namespacedLiveResources { + if _, ok := normalizedKeys[r.Key().normalize()]; !ok { + removeKeys = append(removeKeys, r.Key()) + } + } + } + + { + normalizedKeys := make(map[ResourceKey]struct{}, len(manifests)) + for _, m := range manifests { + // We don't care about the namespace of the cluster-scoped resources. + normalizedKeys[m.Key().normalize().withoutNamespace()] = struct{}{} + } + for _, r := range clusterScopedLiveResources { + // We don't care about the namespace of the cluster-scoped resources. + if _, ok := normalizedKeys[r.Key().normalize().withoutNamespace()]; !ok { + removeKeys = append(removeKeys, r.Key()) + } + } + } + + return removeKeys +} diff --git a/pkg/app/pipedv1/plugin/kubernetes/provider/resource_test.go b/pkg/app/pipedv1/plugin/kubernetes/provider/resource_test.go new file mode 100644 index 0000000000..3bc45f7e73 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes/provider/resource_test.go @@ -0,0 +1,153 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package provider + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestResourceKey_normalizeNamespace(t *testing.T) { + tests := []struct { + name string + resourceKey ResourceKey + expected ResourceKey + }{ + { + name: "default namespace", + resourceKey: ResourceKey{ + groupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + namespace: DefaultNamespace, + name: "test-deployment", + }, + expected: ResourceKey{ + groupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + namespace: "", + name: "test-deployment", + }, + }, + { + name: "non-default namespace", + resourceKey: ResourceKey{ + groupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + namespace: "custom-namespace", + name: "test-deployment", + }, + expected: ResourceKey{ + groupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + namespace: "custom-namespace", + name: "test-deployment", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := tt.resourceKey.normalizeNamespace() + assert.Equal(t, tt.expected, actual) + }) + } +} + +func TestFindRemoveResources(t *testing.T) { + tests := []struct { + name string + manifestsYAML string + namespacedLiveResourcesYAML string + clusterScopedLiveResourcesYAML string + expectedRemoveKeys []ResourceKey + }{ + { + name: "find remove resources", + manifestsYAML: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-configmap + namespace: default + annotations: + "pipecd.dev/managed-by": "piped" +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secret + namespace: default + annotations: + "pipecd.dev/managed-by": "piped" +`, + namespacedLiveResourcesYAML: ` +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-configmap + namespace: default + annotations: + "pipecd.dev/managed-by": "piped" +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secret + namespace: default + annotations: + "pipecd.dev/managed-by": "piped" +--- +apiVersion: v1 +kind: Secret +metadata: + name: old-secret + namespace: default + annotations: + "pipecd.dev/managed-by": "piped" +`, + clusterScopedLiveResourcesYAML: ` +apiVersion: v1 +kind: Namespace +metadata: + name: test-namespace + annotations: + "pipecd.dev/managed-by": "piped" +`, + expectedRemoveKeys: []ResourceKey{ + { + groupKind: schema.GroupKind{Group: "", Kind: "Secret"}, + namespace: "default", + name: "old-secret", + }, + { + groupKind: schema.GroupKind{Group: "", Kind: "Namespace"}, + namespace: "", + name: "test-namespace", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + manifests := mustParseManifests(t, tt.manifestsYAML) + + namespacedLiveResources := mustParseManifests(t, tt.namespacedLiveResourcesYAML) + + clusterScopedLiveResources := mustParseManifests(t, tt.clusterScopedLiveResourcesYAML) + + removeKeys := FindRemoveResources(manifests, namespacedLiveResources, clusterScopedLiveResources) + assert.ElementsMatch(t, tt.expectedRemoveKeys, removeKeys) + }) + } +}