Skip to content

Commit

Permalink
Merge branch 'master' into plugin/metadatastore-v4
Browse files Browse the repository at this point in the history
Signed-off-by: t-kikuc <[email protected]>
  • Loading branch information
t-kikuc committed Jan 14, 2025
2 parents 94da993 + 58fda21 commit a254387
Show file tree
Hide file tree
Showing 39 changed files with 1,891 additions and 385 deletions.
18 changes: 15 additions & 3 deletions pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/eventwatcher"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/notifier"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/statsreporter"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/trigger"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
Expand Down Expand Up @@ -350,7 +351,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
}

// Make grpc clients to connect to plugins.
pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins))
plugins := make([]plugin.Plugin, 0, len(cfg.Plugins))
options := []rpcclient.DialOption{
rpcclient.WithBlock(),
rpcclient.WithInsecure(),
Expand All @@ -359,8 +360,19 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...)
if err != nil {
input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err))
return err
}
pluginClis = append(pluginClis, cli)

plugins = append(plugins, plugin.Plugin{
Name: plg.Name,
Cli: cli,
})
}

pluginRegistry, err := plugin.NewPluginRegistry(ctx, plugins)
if err != nil {
input.Logger.Error("failed to create plugin registry", zap.Error(err))
return err
}

// Start running application live state reporter.
Expand All @@ -385,7 +397,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
c := controller.NewController(
apiClient,
gitClient,
pluginClis,
pluginRegistry,
deploymentLister,
commandLister,
notifier,
Expand Down
36 changes: 8 additions & 28 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ import (

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type apiClient interface {
Expand Down Expand Up @@ -98,10 +97,9 @@ type controller struct {
secretDecrypter secretDecrypter
metadataStoreRegistry metadatastore.MetadataStoreRegistry

// gRPC clients to communicate with plugins.
pluginClients []pluginapi.PluginClient
// Map from stage name to the plugin client.
stageBasedPluginsMap map[string]pluginapi.PluginClient
// The registry of all plugins.
pluginRegistry plugin.PluginRegistry

// Map from application ID to the planner
// of a pending deployment of that application.
planners map[string]*planner
Expand Down Expand Up @@ -133,7 +131,7 @@ type controller struct {
func NewController(
apiClient apiClient,
gitClient gitClient,
pluginClients []pluginapi.PluginClient,
pluginRegistry plugin.PluginRegistry,
deploymentLister deploymentLister,
commandLister commandLister,
notifier notifier,
Expand All @@ -147,7 +145,7 @@ func NewController(
return &controller{
apiClient: apiClient,
gitClient: gitClient,
pluginClients: pluginClients,
pluginRegistry: pluginRegistry,
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
Expand Down Expand Up @@ -183,23 +181,6 @@ func (c *controller) Run(ctx context.Context) error {
c.workspaceDir = dir
c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir))

// Build the list of stages that can be handled by piped's plugins.
stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient)
for _, plugin := range c.pluginClients {
resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{})
if err != nil {
return err
}
for _, stage := range resp.GetStages() {
if _, ok := stagesBasedPluginsMap[stage]; ok {
c.logger.Error("duplicated stage name", zap.String("stage", stage))
return fmt.Errorf("duplicated stage name %s", stage)
}
stagesBasedPluginsMap[stage] = plugin
}
}
c.stageBasedPluginsMap = stagesBasedPluginsMap

ticker := time.NewTicker(c.syncInternal)
defer ticker.Stop()
c.logger.Info("start syncing planners and schedulers")
Expand Down Expand Up @@ -452,8 +433,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
commitHash,
configFilename,
workingDir,
c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment.
c.stageBasedPluginsMap,
c.pluginRegistry,
c.apiClient,
c.gitClient,
c.notifier,
Expand Down Expand Up @@ -594,7 +574,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
workingDir,
c.apiClient,
c.gitClient,
c.stageBasedPluginsMap,
c.pluginRegistry,
c.notifier,
c.secretDecrypter,
c.logger,
Expand Down
62 changes: 36 additions & 26 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/model"
Expand All @@ -56,13 +57,6 @@ type planner struct {
lastSuccessfulConfigFilename string
workingDir string

// The plugin clients are used to call plugin that actually
// performs planning deployment.
plugins []pluginapi.PluginClient
// The map used to know which plugin is incharged for a given stage
// of the current deployment.
stageBasedPluginsMap map[string]pluginapi.PluginClient

// The apiClient is used to report the deployment status.
apiClient apiClient

Expand All @@ -76,6 +70,9 @@ type planner struct {
// which encrypted using PipeCD built-in secret management.
secretDecrypter secretDecrypter

// The pluginRegistry is used to determine which plugins to be used
pluginRegistry plugin.PluginRegistry

logger *zap.Logger
tracer trace.Tracer

Expand All @@ -93,8 +90,7 @@ func newPlanner(
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClients []pluginapi.PluginClient,
stageBasedPluginsMap map[string]pluginapi.PluginClient,
pluginRegistry plugin.PluginRegistry,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
Expand All @@ -116,8 +112,7 @@ func newPlanner(
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
stageBasedPluginsMap: stageBasedPluginsMap,
plugins: pluginClients,
pluginRegistry: pluginRegistry,
apiClient: apiClient,
gitClient: gitClient,
notifier: notifier,
Expand Down Expand Up @@ -262,8 +257,21 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment
TargetDeploymentSource: targetDS,
}

cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
if err != nil {
p.logger.Error("unable to parse application config", zap.Error(err))
return nil, err
}
spec := cfg.Spec

plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(spec)
if err != nil {
p.logger.Error("unable to get plugin clients by app config", zap.Error(err))
return nil, err
}

// Build deployment target versions.
for _, plg := range p.plugins {
for _, plg := range plugins {
vRes, err := plg.DetermineVersions(ctx, &deployment.DetermineVersionsRequest{Input: input})
if err != nil {
p.logger.Warn("unable to determine versions", zap.Error(err))
Expand All @@ -280,13 +288,6 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment
}
}

cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
if err != nil {
p.logger.Error("unable to parse application config", zap.Error(err))
return nil, err
}
spec := cfg.Spec

// In case the strategy has been decided by trigger.
// For example: user triggered the deployment via web console.
switch p.deployment.Trigger.SyncStrategy {
Expand Down Expand Up @@ -371,7 +372,7 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment
summary string
)
// Build plan based on plugins determined strategy
for _, plg := range p.plugins {
for _, plg := range plugins {
res, err := plg.DetermineStrategy(ctx, &deployment.DetermineStrategyRequest{Input: input})
if err != nil {
p.logger.Warn("Unable to determine strategy using current plugin", zap.Error(err))
Expand Down Expand Up @@ -417,10 +418,17 @@ func (p *planner) buildQuickSyncStages(ctx context.Context, cfg *config.GenericA
rollbackStages = []*model.PipelineStage{}
rollback = *cfg.Planner.AutoRollback
)
for _, plg := range p.plugins {

plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(cfg)
if err != nil {
p.logger.Error("failed to get plugin clients by app config", zap.Error(err))
return nil, err
}
for _, plg := range plugins {
res, err := plg.BuildQuickSyncStages(ctx, &deployment.BuildQuickSyncStagesRequest{Rollback: rollback})
if err != nil {
return nil, fmt.Errorf("failed to build quick sync stage deployment (%w)", err)
p.logger.Error("failed to build quick sync stages for deployment", zap.Error(err))
return nil, err
}
for i := range res.Stages {
if res.Stages[i].Rollback {
Expand Down Expand Up @@ -458,9 +466,10 @@ func (p *planner) buildPipelineSyncStages(ctx context.Context, cfg *config.Gener
// Build stages config for each plugin.
for i := range stagesCfg {
stageCfg := stagesCfg[i]
plg, ok := p.stageBasedPluginsMap[stageCfg.Name.String()]
if !ok {
return nil, fmt.Errorf("unable to find plugin for stage %q", stageCfg.Name.String())
plg, err := p.pluginRegistry.GetPluginClientByStageName(stageCfg.Name.String())
if err != nil {
p.logger.Error("failed to get plugin client by stage name", zap.Error(err))
return nil, err
}

stagesCfgPerPlugin[plg] = append(stagesCfgPerPlugin[plg], &deployment.BuildPipelineSyncStagesRequest_StageConfig{
Expand All @@ -480,7 +489,8 @@ func (p *planner) buildPipelineSyncStages(ctx context.Context, cfg *config.Gener
Rollback: rollback,
})
if err != nil {
return nil, fmt.Errorf("failed to build pipeline sync stages for deployment (%w)", err)
p.logger.Error("failed to build pipeline sync stages for deployment", zap.Error(err))
return nil, err
}
// TODO: Ensure responsed stages indexies is valid.
for i := range res.Stages {
Expand Down
Loading

0 comments on commit a254387

Please sign in to comment.