Skip to content

Commit

Permalink
Implemented the backend of Metadata APIs as MetadataStoreRegistry (#5471
Browse files Browse the repository at this point in the history
)

* Create MetadataStoreRegistry

Signed-off-by: t-kikuc <[email protected]>

* refactor: use only DeploymentID

Signed-off-by: t-kikuc <[email protected]>

* Add MetadataStore methods to PluginAPI

Signed-off-by: t-kikuc <[email protected]>

* Get Metadata directly from Deployment

Signed-off-by: t-kikuc <[email protected]>

* Handle MetadataStoreRegistry in Controller

Signed-off-by: t-kikuc <[email protected]>

* Revert "refactor: use only DeploymentID"

This reverts commit 3707f8d.

Signed-off-by: t-kikuc <[email protected]>

* refactor: rename api->service

Signed-off-by: t-kikuc <[email protected]>

* add func comments

Signed-off-by: t-kikuc <[email protected]>

* Fix import order

Signed-off-by: t-kikuc <[email protected]>

* add 'plugin' to distinguish which plugin put the metadata

Signed-off-by: t-kikuc <[email protected]>

* Add metadata_v2 to Deployment to have shared and plugins metadata

Signed-off-by: t-kikuc <[email protected]>

* Add 2 rpcs of SaveDeploymentMetadataV2

Signed-off-by: t-kikuc <[email protected]>

* Split DeploymentMetadata rpcs into plugin and shared

Signed-off-by: t-kikuc <[email protected]>

* refactor: rename to pluginName and remove an old comment

Signed-off-by: t-kikuc <[email protected]>

* Split DeploymentMetadata methods to plugin and shared in piped side

Signed-off-by: t-kikuc <[email protected]>

* refactor and fix tests

Signed-off-by: t-kikuc <[email protected]>

---------

Signed-off-by: t-kikuc <[email protected]>
  • Loading branch information
t-kikuc authored Jan 21, 2025
1 parent aa454ff commit 7f7d7ec
Show file tree
Hide file tree
Showing 23 changed files with 4,512 additions and 1,624 deletions.
45 changes: 38 additions & 7 deletions pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice"
Expand All @@ -32,8 +33,9 @@ type PluginAPI struct {
cfg *config.PipedSpec
apiClient apiClient

toolRegistry *toolRegistry
Logger *zap.Logger
toolRegistry *toolRegistry
Logger *zap.Logger
metadataStoreRegistry *metadatastore.MetadataStoreRegistry
}

type apiClient interface {
Expand All @@ -46,17 +48,18 @@ func (a *PluginAPI) Register(server *grpc.Server) {
service.RegisterPluginServiceServer(server, a)
}

func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, toolsDir string, logger *zap.Logger) (*PluginAPI, error) {
func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, toolsDir string, logger *zap.Logger, metadataStoreRegistry *metadatastore.MetadataStoreRegistry) (*PluginAPI, error) {
toolRegistry, err := newToolRegistry(toolsDir)
if err != nil {
return nil, fmt.Errorf("failed to create tool registry: %w", err)
}

return &PluginAPI{
cfg: cfg,
apiClient: apiClient,
toolRegistry: toolRegistry,
Logger: logger.Named("plugin-api"),
cfg: cfg,
apiClient: apiClient,
toolRegistry: toolRegistry,
Logger: logger.Named("plugin-api"),
metadataStoreRegistry: metadataStoreRegistry,
}, nil
}

Expand Down Expand Up @@ -112,3 +115,31 @@ func (a *PluginAPI) ReportStageLogsFromLastCheckpoint(ctx context.Context, req *

return &service.ReportStageLogsFromLastCheckpointResponse{}, nil
}

func (a *PluginAPI) GetStageMetadata(ctx context.Context, req *service.GetStageMetadataRequest) (*service.GetStageMetadataResponse, error) {
return a.metadataStoreRegistry.GetStageMetadata(ctx, req)
}

func (a *PluginAPI) PutStageMetadata(ctx context.Context, req *service.PutStageMetadataRequest) (*service.PutStageMetadataResponse, error) {
return a.metadataStoreRegistry.PutStageMetadata(ctx, req)
}

func (a *PluginAPI) PutStageMetadataMulti(ctx context.Context, req *service.PutStageMetadataMultiRequest) (*service.PutStageMetadataMultiResponse, error) {
return a.metadataStoreRegistry.PutStageMetadataMulti(ctx, req)
}

func (a *PluginAPI) GetDeploymentPluginMetadata(ctx context.Context, req *service.GetDeploymentPluginMetadataRequest) (*service.GetDeploymentPluginMetadataResponse, error) {
return a.metadataStoreRegistry.GetDeploymentPluginMetadata(ctx, req)
}

func (a *PluginAPI) PutDeploymentPluginMetadata(ctx context.Context, req *service.PutDeploymentPluginMetadataRequest) (*service.PutDeploymentPluginMetadataResponse, error) {
return a.metadataStoreRegistry.PutDeploymentPluginMetadata(ctx, req)
}

func (a *PluginAPI) PutDeploymentPluginMetadataMulti(ctx context.Context, req *service.PutDeploymentPluginMetadataMultiRequest) (*service.PutDeploymentPluginMetadataMultiResponse, error) {
return a.metadataStoreRegistry.PutDeploymentPluginMetadataMulti(ctx, req)
}

func (a *PluginAPI) GetDeploymentSharedMetadata(ctx context.Context, req *service.GetDeploymentSharedMetadataRequest) (*service.GetDeploymentSharedMetadataResponse, error) {
return a.metadataStoreRegistry.GetDeploymentSharedMetadata(ctx, req)
}
5 changes: 4 additions & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/eventwatcher"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/notifier"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/statsreporter"
Expand Down Expand Up @@ -299,10 +300,11 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
eventLister = store.Lister()
}

metadataStoreRegistry := metadatastore.NewMetadataStoreRegistry(apiClient)
// Start running plugin service server.
{
var (
service, err = grpcapi.NewPluginAPI(cfg, apiClient, p.toolsDir, input.Logger)
service, err = grpcapi.NewPluginAPI(cfg, apiClient, p.toolsDir, input.Logger, metadataStoreRegistry)
opts = []rpc.Option{
rpc.WithPort(p.pluginServicePort),
rpc.WithGracePeriod(p.gracePeriod),
Expand Down Expand Up @@ -400,6 +402,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
commandLister,
notifier,
decrypter,
*metadataStoreRegistry,
p.gracePeriod,
input.Logger,
tracerProvider,
Expand Down
33 changes: 20 additions & 13 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/status"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/git"
Expand Down Expand Up @@ -88,12 +89,13 @@ var (
)

type controller struct {
apiClient apiClient
gitClient gitClient
deploymentLister deploymentLister
commandLister commandLister
notifier notifier
secretDecrypter secretDecrypter
apiClient apiClient
gitClient gitClient
deploymentLister deploymentLister
commandLister commandLister
notifier notifier
secretDecrypter secretDecrypter
metadataStoreRegistry metadatastore.MetadataStoreRegistry

// The registry of all plugins.
pluginRegistry plugin.PluginRegistry
Expand Down Expand Up @@ -134,19 +136,21 @@ func NewController(
commandLister commandLister,
notifier notifier,
secretDecrypter secretDecrypter,
metadataStoreRegistry metadatastore.MetadataStoreRegistry,
gracePeriod time.Duration,
logger *zap.Logger,
tracerProvider trace.TracerProvider,
) DeploymentController {

return &controller{
apiClient: apiClient,
gitClient: gitClient,
pluginRegistry: pluginRegistry,
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
secretDecrypter: secretDecrypter,
apiClient: apiClient,
gitClient: gitClient,
pluginRegistry: pluginRegistry,
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
secretDecrypter: secretDecrypter,
metadataStoreRegistry: metadataStoreRegistry,

planners: make(map[string]*planner),
donePlanners: make(map[string]time.Time),
Expand Down Expand Up @@ -577,6 +581,8 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
c.tracerProvider,
)

c.metadataStoreRegistry.Register(d)

cleanup := func() {
logger.Info("cleaning up working directory for scheduler", zap.String("working-dir", workingDir))
err := os.RemoveAll(workingDir)
Expand All @@ -594,6 +600,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
go func() {
defer c.wg.Done()
defer cleanup()
defer c.metadataStoreRegistry.Delete(d.Id)
if err := scheduler.Run(ctx); err != nil {
logger.Error("failed to run scheduler", zap.Error(err))
}
Expand Down
10 changes: 3 additions & 7 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
Expand Down Expand Up @@ -64,10 +63,8 @@ type planner struct {
// The gitClient is used to perform git commands.
gitClient gitClient

// The notifier and metadataStore are used for
// notification features.
notifier notifier
metadataStore metadatastore.MetadataStore
// The notifier is used for notification features.
notifier notifier

// The secretDecrypter is used to decrypt secrets
// which encrypted using PipeCD built-in secret management.
Expand Down Expand Up @@ -118,7 +115,6 @@ func newPlanner(
pluginRegistry: pluginRegistry,
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
notifier: notifier,
secretDecrypter: secretDecrypter,
doneDeploymentStatus: d.Status,
Expand Down Expand Up @@ -657,7 +653,7 @@ func (p *planner) reportDeploymentCancelled(ctx context.Context, commander, reas

// getApplicationNotificationMentions returns the list of users groups who should be mentioned in the notification.
func (p *planner) getApplicationNotificationMentions(event model.NotificationEventType) ([]string, []string, error) {
n, ok := p.metadataStore.Shared().Get(model.MetadataKeyDeploymentNotification)
n, ok := p.deployment.Metadata[model.MetadataKeyDeploymentNotification]
if !ok {
return []string{}, []string{}, nil
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/app/pipedv1/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
Expand All @@ -47,7 +46,6 @@ type scheduler struct {

apiClient apiClient
gitClient gitClient
metadataStore metadatastore.MetadataStore
notifier notifier
secretDecrypter secretDecrypter

Expand Down Expand Up @@ -99,7 +97,6 @@ func newScheduler(
apiClient: apiClient,
gitClient: gitClient,
pluginRegistry: pluginRegistry,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
notifier: notifier,
secretDecrypter: secretsDecrypter,
doneDeploymentStatus: d.Status,
Expand Down Expand Up @@ -717,7 +714,7 @@ func (s *scheduler) reportDeploymentCompleted(ctx context.Context, status model.

// getApplicationNotificationMentions returns the list of users groups who should be mentioned in the notification.
func (s *scheduler) getApplicationNotificationMentions(event model.NotificationEventType) ([]string, []string, error) {
n, ok := s.metadataStore.Shared().Get(model.MetadataKeyDeploymentNotification)
n, ok := s.deployment.Metadata[model.MetadataKeyDeploymentNotification]
if !ok {
return []string{}, []string{}, nil
}
Expand Down
Loading

0 comments on commit 7f7d7ec

Please sign in to comment.