Skip to content

Commit

Permalink
Pass secret decrypter to pipedv1 schedulers and planners (#5433)
Browse files Browse the repository at this point in the history
Signed-off-by: khanhtc1202 <[email protected]>
  • Loading branch information
khanhtc1202 authored Dec 18, 2024
1 parent 7cf95df commit 17a5007
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 37 deletions.
9 changes: 8 additions & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
// TODO: Implement the drift detector controller.
}

// Initialize secret decrypter.
decrypter, err := p.initializeSecretDecrypter(cfg)
if err != nil {
input.Logger.Error("failed to initialize secret decrypter", zap.Error(err))
return err
}

// Start running deployment controller.
{
c := controller.NewController(
Expand All @@ -380,6 +387,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
deploymentLister,
commandLister,
notifier,
decrypter,
p.gracePeriod,
input.Logger,
tracerProvider,
Expand Down Expand Up @@ -667,7 +675,6 @@ func (p *piped) runPlugins(ctx context.Context, pluginsCfg []config.PipedPlugin,
return plugins, nil
}

// TODO: Remove this once the decryption task by plugin call to the plugin service is implemented.
func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) {
sm := cfg.SecretManagement
if sm == nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type notifier interface {
Notify(event model.NotificationEvent)
}

type secretDecrypter interface {
Decrypt(string) (string, error)
}

type DeploymentController interface {
Run(ctx context.Context) error
}
Expand All @@ -90,6 +94,7 @@ type controller struct {
deploymentLister deploymentLister
commandLister commandLister
notifier notifier
secretDecrypter secretDecrypter

// gRPC clients to communicate with plugins.
pluginClients []pluginapi.PluginClient
Expand Down Expand Up @@ -130,6 +135,7 @@ func NewController(
deploymentLister deploymentLister,
commandLister commandLister,
notifier notifier,
secretDecrypter secretDecrypter,
gracePeriod time.Duration,
logger *zap.Logger,
tracerProvider trace.TracerProvider,
Expand All @@ -142,6 +148,7 @@ func NewController(
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
secretDecrypter: secretDecrypter,

planners: make(map[string]*planner),
donePlanners: make(map[string]time.Time),
Expand Down Expand Up @@ -446,6 +453,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
c.apiClient,
c.gitClient,
c.notifier,
c.secretDecrypter,
c.logger,
c.tracerProvider,
)
Expand Down Expand Up @@ -584,6 +592,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
c.gitClient,
c.stageBasedPluginsMap,
c.notifier,
c.secretDecrypter,
c.logger,
c.tracerProvider,
)
Expand Down
44 changes: 26 additions & 18 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ type planner struct {
notifier notifier
metadataStore metadatastore.MetadataStore

// TODO: Find a way to show log from pluggin's planner
// The secretDecrypter is used to decrypt secrets
// which encrypted using PipeCD built-in secret management.
secretDecrypter secretDecrypter

logger *zap.Logger
tracer trace.Tracer

Expand All @@ -98,6 +101,7 @@ func newPlanner(
apiClient apiClient,
gitClient gitClient,
notifier notifier,
secretDecrypter secretDecrypter,
logger *zap.Logger,
tracerProvider trace.TracerProvider,
) *planner {
Expand All @@ -121,6 +125,7 @@ func newPlanner(
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
notifier: notifier,
secretDecrypter: secretDecrypter,
doneDeploymentStatus: d.Status,
cancelledCh: make(chan *model.ReportableCommand, 1),
nowFunc: time.Now,
Expand Down Expand Up @@ -193,31 +198,34 @@ func (p *planner) Run(ctx context.Context) error {
Branch: p.deployment.GitPath.Repo.Branch,
}

runningDSP := deploysource.NewProvider(
filepath.Join(p.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
)
rds, err := runningDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard
if err != nil {
// TODO: log error
return fmt.Errorf("error while preparing deploy source data (%v)", err)
}
runningDS = rds.ToPluginDeploySource()

targetDSP := deploysource.NewProvider(
filepath.Join(p.workingDir, "target-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash),
p.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
p.deployment.GetGitPath(),
p.secretDecrypter,
)
tds, err := targetDSP.Get(ctx, io.Discard) // TODO: pass not io.Discard
tds, err := targetDSP.Get(ctx, io.Discard)
if err != nil {
// TODO: log error
return fmt.Errorf("error while preparing deploy source data (%v)", err)
p.logger.Error("error while preparing target deploy source data", zap.Error(err))
return err
}
targetDS = tds.ToPluginDeploySource()

// TODO: Pass running DS as well if need?
if p.lastSuccessfulCommitHash != "" {
runningDSP := deploysource.NewProvider(
filepath.Join(p.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
p.deployment.GetGitPath(),
p.secretDecrypter,
)
rds, err := runningDSP.Get(ctx, io.Discard)
if err != nil {
p.logger.Error("error while preparing running deploy source data", zap.Error(err))
return err
}
runningDS = rds.ToPluginDeploySource()
}

out, err := p.buildPlan(ctx, runningDS, targetDS)

// If the deployment was already cancelled, we ignore the plan result.
Expand Down
42 changes: 24 additions & 18 deletions pkg/app/pipedv1/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type scheduler struct {

stageBasedPluginsMap map[string]pluginapi.PluginClient

apiClient apiClient
gitClient gitClient
metadataStore metadatastore.MetadataStore
notifier notifier
apiClient apiClient
gitClient gitClient
metadataStore metadatastore.MetadataStore
notifier notifier
secretDecrypter secretDecrypter

targetDSP deploysource.Provider
runningDSP deploysource.Provider
Expand Down Expand Up @@ -80,6 +81,7 @@ func newScheduler(
gitClient gitClient,
stageBasedPluginsMap map[string]pluginapi.PluginClient,
notifier notifier,
secretsDecrypter secretDecrypter,
logger *zap.Logger,
tracerProvider trace.TracerProvider,
) *scheduler {
Expand All @@ -99,6 +101,7 @@ func newScheduler(
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
notifier: notifier,
secretDecrypter: secretsDecrypter,
doneDeploymentStatus: d.Status,
cancelledCh: make(chan *model.ReportableCommand, 1),
logger: logger,
Expand Down Expand Up @@ -165,7 +168,7 @@ func (s *scheduler) Cancel(cmd model.ReportableCommand) {
}

// Run starts running the scheduler.
// It determines what stage should be executed next by which executor.
// It determines what stage should be executed next by which plugin.
// The returning error does not mean that the pipeline was failed,
// but it means that the scheduler could not finish its job normally.
func (s *scheduler) Run(ctx context.Context) error {
Expand Down Expand Up @@ -193,7 +196,7 @@ func (s *scheduler) Run(ctx context.Context) error {
}
controllermetrics.UpdateDeploymentStatus(s.deployment, model.DeploymentStatus_DEPLOYMENT_RUNNING)

// notify the deployment started event
// Notify the deployment started event
users, groups, err := s.getApplicationNotificationMentions(model.NotificationEventType_EVENT_DEPLOYMENT_STARTED)
if err != nil {
s.logger.Error("failed to get the list of users or groups", zap.Error(err))
Expand Down Expand Up @@ -223,16 +226,20 @@ func (s *scheduler) Run(ctx context.Context) error {
Branch: s.deployment.GitPath.Repo.Branch,
}

s.runningDSP = deploysource.NewProvider(
filepath.Join(s.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash),
s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
)
if s.deployment.RunningCommitHash != "" {
s.runningDSP = deploysource.NewProvider(
filepath.Join(s.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "running", s.deployment.RunningCommitHash),
s.deployment.GetGitPath(),
s.secretDecrypter,
)
}

s.targetDSP = deploysource.NewProvider(
filepath.Join(s.workingDir, "target-deploysource"),
deploysource.NewGitSourceCloner(s.gitClient, repoCfg, "target", s.deployment.Trigger.Commit.Hash),
s.deployment.GetGitPath(), nil, // TODO: pass secret decrypter?
s.deployment.GetGitPath(),
s.secretDecrypter,
)

ds, err := s.targetDSP.Get(ctx, io.Discard)
Expand Down Expand Up @@ -469,13 +476,13 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final

rds, err := s.runningDSP.Get(ctx, io.Discard)
if err != nil {
s.logger.Error("failed to get running deployment source", zap.Error(err))
s.logger.Error("failed to get running deployment source", zap.String("stage-name", ps.Name), zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}

tds, err := s.targetDSP.Get(ctx, io.Discard)
if err != nil {
s.logger.Error("failed to get target deployment source", zap.Error(err))
s.logger.Error("failed to get target deployment source", zap.String("stage-name", ps.Name), zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}

Expand Down Expand Up @@ -508,16 +515,15 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
// Find the executor plugin for this stage.
plugin, ok := s.stageBasedPluginsMap[ps.Name]
if !ok {
err := fmt.Errorf("no registered plugin that can perform for stage %s", ps.Name)
s.logger.Error(err.Error())
s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name))
s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires)
return model.StageStatus_STAGE_FAILURE
}

// Load the stage configuration.
stageConfig, stageConfigFound := s.genericApplicationConfig.GetStageByte(ps.Index)
if !stageConfigFound {
s.logger.Error("Unable to find the stage configuration")
s.logger.Error("Unable to find the stage configuration", zap.String("stage-name", ps.Name))
if err := s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires); err != nil {
s.logger.Error("failed to report stage status", zap.Error(err))
}
Expand All @@ -535,7 +541,7 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
},
})
if err != nil {
s.logger.Error("failed to execute stage", zap.Error(err))
s.logger.Error("failed to execute stage", zap.String("stage-name", ps.Name), zap.Error(err))
s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires)
return model.StageStatus_STAGE_FAILURE
}
Expand Down

0 comments on commit 17a5007

Please sign in to comment.