From 692eafe546e502debb742e9c7836e44ea4a74c91 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Mon, 23 Dec 2024 18:24:03 +0900 Subject: [PATCH 1/7] init wait plugin Signed-off-by: t-kikuc --- .../plugin/wait/config/stage_options.go | 46 +++++ .../plugin/wait/config/stage_options_test.go | 82 +++++++++ .../pipedv1/plugin/wait/execute/options.go | 31 ++++ pkg/app/pipedv1/plugin/wait/execute/server.go | 97 +++++++++++ pkg/app/pipedv1/plugin/wait/execute/wait.go | 132 ++++++++++++++ pkg/app/pipedv1/plugin/wait/main.go | 34 ++++ pkg/app/pipedv1/plugin/wait/server.go | 161 ++++++++++++++++++ 7 files changed, 583 insertions(+) create mode 100644 pkg/app/pipedv1/plugin/wait/config/stage_options.go create mode 100644 pkg/app/pipedv1/plugin/wait/config/stage_options_test.go create mode 100644 pkg/app/pipedv1/plugin/wait/execute/options.go create mode 100644 pkg/app/pipedv1/plugin/wait/execute/server.go create mode 100644 pkg/app/pipedv1/plugin/wait/execute/wait.go create mode 100644 pkg/app/pipedv1/plugin/wait/main.go create mode 100644 pkg/app/pipedv1/plugin/wait/server.go diff --git a/pkg/app/pipedv1/plugin/wait/config/stage_options.go b/pkg/app/pipedv1/plugin/wait/config/stage_options.go new file mode 100644 index 0000000000..2632129e4d --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/config/stage_options.go @@ -0,0 +1,46 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + + "github.com/creasty/defaults" + "sigs.k8s.io/yaml" +) + +type StageOptions interface { + Validate() error +} + +// DecodeYAML unmarshals stageOptions YAML data to specified StageOptions type and validates the result. +func DecodeStageOptionsYAML[T StageOptions](data []byte) (*T, error) { + js, err := yaml.YAMLToJSON(data) + if err != nil { + return nil, err + } + + var o T + if err := json.Unmarshal(js, &o); err != nil { + return nil, err + } + if err := defaults.Set(&o); err != nil { + return nil, err + } + if err := o.Validate(); err != nil { + return nil, err + } + return &o, nil +} diff --git a/pkg/app/pipedv1/plugin/wait/config/stage_options_test.go b/pkg/app/pipedv1/plugin/wait/config/stage_options_test.go new file mode 100644 index 0000000000..65ac905d2c --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/config/stage_options_test.go @@ -0,0 +1,82 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +type testStageOptions struct { + Number int `json:"number"` + Text string `json:"text"` +} + +func (t testStageOptions) Validate() error { + if t.Number > 0 { + return nil + } else { + return errors.New("number must be greater than 0") + } +} + +func TestDecodeStageOptionsYAML(t *testing.T) { + t.Parallel() + + testcases := []struct { + title string + data []byte + expected testStageOptions + expectErr bool + }{ + { + title: "valid data", + data: []byte(` +number: 123 +text: "test" +`), + expected: testStageOptions{ + Number: 123, + Text: "test", + }, + expectErr: false, + }, + { + title: "invalid format", + data: []byte(`invalid`), + expected: testStageOptions{}, + expectErr: true, + }, + { + title: "validation failed", + data: []byte(`number: -1`), + expected: testStageOptions{}, + expectErr: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.title, func(t *testing.T) { + t.Parallel() + got, err := DecodeStageOptionsYAML[testStageOptions](tc.data) + assert.Equal(t, tc.expectErr, err != nil) + if err == nil { + assert.Equal(t, tc.expected, *got) + } + }) + } +} diff --git a/pkg/app/pipedv1/plugin/wait/execute/options.go b/pkg/app/pipedv1/plugin/wait/execute/options.go new file mode 100644 index 0000000000..9c50cd3073 --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/execute/options.go @@ -0,0 +1,31 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package execute + +import ( + config "github.com/pipe-cd/pipecd/pkg/configv1" +) + +// waitStageOptions contains all configurable values for a WAIT stage. +type waitStageOptions struct { + Duration config.Duration `json:"duration"` + // TODO: Handle SkipOn options. + // SkipOn config.SkipOptions `json:"skipOn,omitempty"` +} + +func (o waitStageOptions) Validate() error { + // TODO: Implement validation logic. + return nil +} diff --git a/pkg/app/pipedv1/plugin/wait/execute/server.go b/pkg/app/pipedv1/plugin/wait/execute/server.go new file mode 100644 index 0000000000..e109052445 --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/execute/server.go @@ -0,0 +1,97 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package execute + +import ( + "context" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" + config "github.com/pipe-cd/pipecd/pkg/configv1" + "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" + "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" +) + +type deploymentServiceServer struct { + deployment.UnimplementedDeploymentServiceServer + + pluginConfig *config.PipedPlugin + + metadataStore metadatastore.MetadataStore + logger *zap.Logger + logPersister logPersister +} + +type logPersister interface { + StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister +} + +// NewDeploymentService creates a new planService. +func NewDeploymentService( + config *config.PipedPlugin, + logger *zap.Logger, + logPersister logPersister, +) *deploymentServiceServer { + return &deploymentServiceServer{ + pluginConfig: config, + // TODO: Add metadataStore? or not? + logger: logger.Named("planner"), // TODO: Is this 'planner'? + logPersister: logPersister, + } +} + +// Register registers all handling of this service into the specified gRPC server. +func (a *deploymentServiceServer) Register(server *grpc.Server) { + deployment.RegisterDeploymentServiceServer(server, a) +} + +// ExecuteStage implements deployment.ExecuteStage. +func (s *deploymentServiceServer) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (*deployment.ExecuteStageResponse, error) { + slp := s.logPersister.StageLogPersister(request.Input.GetDeployment().GetId(), request.Input.GetStage().GetId()) + return s.execute(ctx, request.Input, slp) +} + +// FetchDefinedStages implements deployment.FetchDefinedStages. +func (s *deploymentServiceServer) FetchDefinedStages(ctx context.Context, request *deployment.FetchDefinedStagesRequest) (*deployment.FetchDefinedStagesResponse, error) { + return &deployment.FetchDefinedStagesResponse{ + Stages: []string{string(stageWait)}, + }, nil +} + +// DetermineVersions implements deployment.DeploymentServiceServer. +func (s *deploymentServiceServer) DetermineVersions(ctx context.Context, request *deployment.DetermineVersionsRequest) (*deployment.DetermineVersionsResponse, error) { + // TODO: Implement this func + return &deployment.DetermineVersionsResponse{}, nil +} + +// DetermineStrategy implements deployment.DeploymentServiceServer. +func (s *deploymentServiceServer) DetermineStrategy(ctx context.Context, request *deployment.DetermineStrategyRequest) (*deployment.DetermineStrategyResponse, error) { + // TODO: Implement this func + return &deployment.DetermineStrategyResponse{}, nil +} + +// BuildPipelineSyncStages implements deployment.BuildPipelineSyncStages. +func (s *deploymentServiceServer) BuildPipelineSyncStages(ctx context.Context, request *deployment.BuildPipelineSyncStagesRequest) (*deployment.BuildPipelineSyncStagesResponse, error) { + // TODO: Implement this func + return &deployment.BuildPipelineSyncStagesResponse{}, nil +} + +// BuildQuickSyncStages implements deployment.BuildQuickSyncStages. +func (s *deploymentServiceServer) BuildQuickSyncStages(ctx context.Context, request *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) { + // TODO: Implement this func + return &deployment.BuildQuickSyncStagesResponse{}, nil +} diff --git a/pkg/app/pipedv1/plugin/wait/execute/wait.go b/pkg/app/pipedv1/plugin/wait/execute/wait.go new file mode 100644 index 0000000000..668434686e --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/execute/wait.go @@ -0,0 +1,132 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package execute + +import ( + "context" + "strconv" + "time" + + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/pipe-cd/pipecd/pkg/app/piped/logpersister" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/config" + "github.com/pipe-cd/pipecd/pkg/model" + "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" +) + +type Stage string + +const ( + defaultDuration = time.Minute + logInterval = 10 * time.Second + startTimeKey = "startTime" + stageWait Stage = "WAIT" +) + +// Execute starts waiting for the specified duration. +func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) (response *deployment.ExecuteStageResponse, err error) { + var ( + // originalStatus = in.Stage.Status + duration = defaultDuration + ) + + opts, err := config.DecodeStageOptionsYAML[waitStageOptions](in.StageConfig) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + // Apply the stage configurations. + if opts != nil { + if opts.Duration > 0 { + duration = opts.Duration.Duration() + } + } + totalDuration := duration + + // Retrieve the saved startTime from the previous run. + startTime := s.retrieveStartTime(in.Stage.Id) + if !startTime.IsZero() { + duration -= time.Since(startTime) + if duration < 0 { + duration = 0 + } + } else { + startTime = time.Now() + } + defer s.saveStartTime(ctx, startTime, in.Stage.Id) + + timer := time.NewTimer(duration) + defer timer.Stop() + + ticker := time.NewTicker(logInterval) + defer ticker.Stop() + + slp.Infof("Waiting for %v...", duration) + for { + select { + case <-timer.C: + slp.Infof("Waited for %v", totalDuration) + return &deployment.ExecuteStageResponse{ + Status: model.StageStatus_STAGE_SUCCESS, + }, nil + + case <-ticker.C: + slp.Infof("%v elapsed...", time.Since(startTime)) + + /** TODO: handle StopSignal + case s := <-sig.Ch(): + switch s { + case executor.StopSignalCancel: + return &deployment.ExecuteStageResponse{ + Status: model.StageStatus_STAGE_CANCELLED, + }, nil + case executor.StopSignalTerminate: + return &deployment.ExecuteStageResponse{ + Status: originalStatus, + }, nil + default: + return &deployment.ExecuteStageResponse{ + Status: model.StageStatus_STAGE_FAILURE, + }, nil // TODO: Return an error message like "received an unknown signal". + } + } + */ + } + } +} + +func (s *deploymentServiceServer) retrieveStartTime(stageId string) (t time.Time) { + sec, ok := s.metadataStore.Stage(stageId).Get(startTimeKey) + if !ok { + return + } + ut, err := strconv.ParseInt(sec, 10, 64) + if err != nil { + return + } + return time.Unix(ut, 0) +} + +func (s *deploymentServiceServer) saveStartTime(ctx context.Context, t time.Time, stageId string) { + metadata := map[string]string{ + startTimeKey: strconv.FormatInt(t.Unix(), 10), + } + if err := s.metadataStore.Stage(stageId).PutMulti(ctx, metadata); err != nil { + s.logger.Error("failed to store metadata", zap.Error(err)) + } +} diff --git a/pkg/app/pipedv1/plugin/wait/main.go b/pkg/app/pipedv1/plugin/wait/main.go new file mode 100644 index 0000000000..84fe7b1f06 --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/main.go @@ -0,0 +1,34 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "log" + + "github.com/pipe-cd/pipecd/pkg/cli" +) + +func main() { + app := cli.NewApp( + "pipecd-plugin-stage-wait", + "Plugin component to execute Wait Stage.", + ) + app.AddCommands( + newPluginCommand(), + ) + if err := app.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/pkg/app/pipedv1/plugin/wait/server.go b/pkg/app/pipedv1/plugin/wait/server.go new file mode 100644 index 0000000000..3dd57649d1 --- /dev/null +++ b/pkg/app/pipedv1/plugin/wait/server.go @@ -0,0 +1,161 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "net/http" + "net/http/pprof" + "time" + + "github.com/pipe-cd/pipecd/pkg/admin" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/execute" + "github.com/pipe-cd/pipecd/pkg/cli" + config "github.com/pipe-cd/pipecd/pkg/configv1" + "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" + "github.com/pipe-cd/pipecd/pkg/plugin/pipedapi" + "github.com/pipe-cd/pipecd/pkg/rpc" + "github.com/pipe-cd/pipecd/pkg/version" + "github.com/spf13/cobra" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type plugin struct { + pipedPluginService string + gracePeriod time.Duration + tls bool + certFile string + keyFile string + config string + + enableGRPCReflection bool +} + +// newPluginCommand creates a new cobra command for executing api server. +func newPluginCommand() *cobra.Command { + s := &plugin{ + gracePeriod: 30 * time.Second, + } + cmd := &cobra.Command{ + Use: "start", + Short: "Start running the wait-stage-plugin.", + RunE: cli.WithContext(s.run), + } + + cmd.Flags().StringVar(&s.pipedPluginService, "piped-plugin-service", s.pipedPluginService, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead. + cmd.Flags().StringVar(&s.config, "config", s.config, "The configuration for the plugin.") + cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.") + + cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.") + cmd.Flags().StringVar(&s.certFile, "cert-file", s.certFile, "The path to the TLS certificate file.") + cmd.Flags().StringVar(&s.keyFile, "key-file", s.keyFile, "The path to the TLS key file.") + + // For debugging early in development + cmd.Flags().BoolVar(&s.enableGRPCReflection, "enable-grpc-reflection", s.enableGRPCReflection, "Whether to enable the reflection service or not.") + + cmd.MarkFlagRequired("piped-plugin-service") + cmd.MarkFlagRequired("config") + + return cmd +} + +func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) { + // Make a cancellable context. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + group, ctx := errgroup.WithContext(ctx) + + pipedapiClient, err := pipedapi.NewClient(ctx, s.pipedPluginService) + if err != nil { + input.Logger.Error("failed to create piped plugin service client", zap.Error(err)) + return err + } + + // Load the configuration. + cfg, err := config.ParsePluginConfig(s.config) + if err != nil { + input.Logger.Error("failed to parse the configuration", zap.Error(err)) + return err + } + + // Start running admin server. + { + var ( + ver = []byte(version.Get().Version) // TODO: get the plugin's version + admin = admin.NewAdmin(0, s.gracePeriod, input.Logger) // TODO: add config for admin port + ) + + admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { + w.Write(ver) + }) + admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("ok")) + }) + admin.HandleFunc("/debug/pprof/", pprof.Index) + admin.HandleFunc("/debug/pprof/profile", pprof.Profile) + admin.HandleFunc("/debug/pprof/trace", pprof.Trace) + + group.Go(func() error { + return admin.Run(ctx) + }) + } + + // Start log persister + persister := logpersister.NewPersister(pipedapiClient, input.Logger) + group.Go(func() error { + return persister.Run(ctx) + }) + + // Start a gRPC server for handling external API requests. + { + var ( + service = execute.NewDeploymentService( + cfg, + input.Logger, + persister, + ) + opts = []rpc.Option{ + rpc.WithPort(cfg.Port), + rpc.WithGracePeriod(s.gracePeriod), + rpc.WithLogger(input.Logger), + rpc.WithLogUnaryInterceptor(input.Logger), + rpc.WithRequestValidationUnaryInterceptor(), + rpc.WithSignalHandlingUnaryInterceptor(), + } + ) + if s.tls { + opts = append(opts, rpc.WithTLS(s.certFile, s.keyFile)) + } + if s.enableGRPCReflection { + opts = append(opts, rpc.WithGRPCReflection()) + } + if input.Flags.Metrics { + opts = append(opts, rpc.WithPrometheusUnaryInterceptor()) + } + + server := rpc.NewServer(service, opts...) + group.Go(func() error { + return server.Run(ctx) + }) + } + + if err := group.Wait(); err != nil { + input.Logger.Error("failed while running", zap.Error(err)) + return err + } + return nil +} From d5b435904451c0bde8bd65b9a8ee84d4a03dd4e2 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Mon, 23 Dec 2024 18:45:18 +0900 Subject: [PATCH 2/7] Improve signal handling and logging in wait plugin execution Signed-off-by: t-kikuc --- pkg/app/pipedv1/plugin/wait/execute/server.go | 18 ++++++++++++++++-- pkg/app/pipedv1/plugin/wait/execute/wait.go | 11 ++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/app/pipedv1/plugin/wait/execute/server.go b/pkg/app/pipedv1/plugin/wait/execute/server.go index e109052445..3df9cdf426 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/server.go +++ b/pkg/app/pipedv1/plugin/wait/execute/server.go @@ -16,6 +16,7 @@ package execute import ( "context" + "time" "go.uber.org/zap" "google.golang.org/grpc" @@ -24,6 +25,7 @@ import ( config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" + "github.com/pipe-cd/pipecd/pkg/plugin/signalhandler" ) type deploymentServiceServer struct { @@ -60,9 +62,21 @@ func (a *deploymentServiceServer) Register(server *grpc.Server) { } // ExecuteStage implements deployment.ExecuteStage. -func (s *deploymentServiceServer) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (*deployment.ExecuteStageResponse, error) { +func (s *deploymentServiceServer) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, err error) { slp := s.logPersister.StageLogPersister(request.Input.GetDeployment().GetId(), request.Input.GetStage().GetId()) - return s.execute(ctx, request.Input, slp) + defer func() { + // When termination signal received and the stage is not completed yet, we should not mark the log persister as completed. + // This can occur when the piped is shutting down while the stage is still running. + if !response.GetStatus().IsCompleted() && signalhandler.Terminated() { + return + } + slp.Complete(time.Minute) + }() + + status := s.execute(ctx, request.Input, slp) + return &deployment.ExecuteStageResponse{ + Status: status, + }, nil } // FetchDefinedStages implements deployment.FetchDefinedStages. diff --git a/pkg/app/pipedv1/plugin/wait/execute/wait.go b/pkg/app/pipedv1/plugin/wait/execute/wait.go index 668434686e..3406bcf29c 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/wait.go +++ b/pkg/app/pipedv1/plugin/wait/execute/wait.go @@ -20,8 +20,6 @@ import ( "time" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/pipe-cd/pipecd/pkg/app/piped/logpersister" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/config" @@ -39,7 +37,7 @@ const ( ) // Execute starts waiting for the specified duration. -func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) (response *deployment.ExecuteStageResponse, err error) { +func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) model.StageStatus { var ( // originalStatus = in.Stage.Status duration = defaultDuration @@ -47,7 +45,8 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex opts, err := config.DecodeStageOptionsYAML[waitStageOptions](in.StageConfig) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + slp.Errorf("failed to decode the stage configuration: %v", err) + return model.StageStatus_STAGE_FAILURE } // Apply the stage configurations. @@ -81,9 +80,7 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex select { case <-timer.C: slp.Infof("Waited for %v", totalDuration) - return &deployment.ExecuteStageResponse{ - Status: model.StageStatus_STAGE_SUCCESS, - }, nil + return model.StageStatus_STAGE_SUCCESS case <-ticker.C: slp.Infof("%v elapsed...", time.Since(startTime)) From 8d882c5be1455e9cfbf86cd69f61890e67e16742 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Mon, 23 Dec 2024 18:50:39 +0900 Subject: [PATCH 3/7] refactor wait.go Signed-off-by: t-kikuc --- pkg/app/pipedv1/plugin/wait/execute/wait.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/app/pipedv1/plugin/wait/execute/wait.go b/pkg/app/pipedv1/plugin/wait/execute/wait.go index 3406bcf29c..1b763c2ded 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/wait.go +++ b/pkg/app/pipedv1/plugin/wait/execute/wait.go @@ -50,10 +50,8 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex } // Apply the stage configurations. - if opts != nil { - if opts.Duration > 0 { - duration = opts.Duration.Duration() - } + if opts != nil && opts.Duration > 0 { + duration = opts.Duration.Duration() } totalDuration := duration @@ -69,6 +67,14 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex } defer s.saveStartTime(ctx, startTime, in.Stage.Id) + if err := s.wait(ctx, duration, totalDuration, startTime, slp); err != nil { + slp.Errorf("failed to wait: %v", err) + return model.StageStatus_STAGE_FAILURE + } + return model.StageStatus_STAGE_SUCCESS +} + +func (s *deploymentServiceServer) wait(ctx context.Context, duration, totalDuration time.Duration, startTime time.Time, slp logpersister.StageLogPersister) error { timer := time.NewTimer(duration) defer timer.Stop() @@ -80,7 +86,7 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex select { case <-timer.C: slp.Infof("Waited for %v", totalDuration) - return model.StageStatus_STAGE_SUCCESS + return nil case <-ticker.C: slp.Infof("%v elapsed...", time.Since(startTime)) From 8541943e78925d663abe963390ccc52228687014 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Mon, 23 Dec 2024 19:45:05 +0900 Subject: [PATCH 4/7] Handle signals Signed-off-by: t-kikuc --- pkg/app/pipedv1/plugin/wait/execute/wait.go | 34 +++++---------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/pkg/app/pipedv1/plugin/wait/execute/wait.go b/pkg/app/pipedv1/plugin/wait/execute/wait.go index 1b763c2ded..5361bd2792 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/wait.go +++ b/pkg/app/pipedv1/plugin/wait/execute/wait.go @@ -67,14 +67,10 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex } defer s.saveStartTime(ctx, startTime, in.Stage.Id) - if err := s.wait(ctx, duration, totalDuration, startTime, slp); err != nil { - slp.Errorf("failed to wait: %v", err) - return model.StageStatus_STAGE_FAILURE - } - return model.StageStatus_STAGE_SUCCESS + return s.wait(ctx, duration, totalDuration, startTime, slp) } -func (s *deploymentServiceServer) wait(ctx context.Context, duration, totalDuration time.Duration, startTime time.Time, slp logpersister.StageLogPersister) error { +func (s *deploymentServiceServer) wait(ctx context.Context, duration, totalDuration time.Duration, startTime time.Time, slp logpersister.StageLogPersister) model.StageStatus { timer := time.NewTimer(duration) defer timer.Stop() @@ -84,31 +80,15 @@ func (s *deploymentServiceServer) wait(ctx context.Context, duration, totalDurat slp.Infof("Waiting for %v...", duration) for { select { - case <-timer.C: + case <-timer.C: // on completed slp.Infof("Waited for %v", totalDuration) - return nil + return model.StageStatus_STAGE_SUCCESS - case <-ticker.C: + case <-ticker.C: // on interval elapsed slp.Infof("%v elapsed...", time.Since(startTime)) - /** TODO: handle StopSignal - case s := <-sig.Ch(): - switch s { - case executor.StopSignalCancel: - return &deployment.ExecuteStageResponse{ - Status: model.StageStatus_STAGE_CANCELLED, - }, nil - case executor.StopSignalTerminate: - return &deployment.ExecuteStageResponse{ - Status: originalStatus, - }, nil - default: - return &deployment.ExecuteStageResponse{ - Status: model.StageStatus_STAGE_FAILURE, - }, nil // TODO: Return an error message like "received an unknown signal". - } - } - */ + case <-ctx.Done(): // on cancelled or terminated + return model.StageStatus_STAGE_CANCELLED // TODO: Is it correct when terminated? } } } From 226f48b7e7589d3547eec92c7eb9941f451a5008 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Mon, 23 Dec 2024 19:49:34 +0900 Subject: [PATCH 5/7] TODO: add metadataStore Signed-off-by: t-kikuc --- pkg/app/pipedv1/plugin/wait/execute/server.go | 6 ++++-- pkg/app/pipedv1/plugin/wait/server.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/app/pipedv1/plugin/wait/execute/server.go b/pkg/app/pipedv1/plugin/wait/execute/server.go index 3df9cdf426..4541571743 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/server.go +++ b/pkg/app/pipedv1/plugin/wait/execute/server.go @@ -45,14 +45,16 @@ type logPersister interface { // NewDeploymentService creates a new planService. func NewDeploymentService( config *config.PipedPlugin, + metadataStore metadatastore.MetadataStore, logger *zap.Logger, logPersister logPersister, ) *deploymentServiceServer { return &deploymentServiceServer{ pluginConfig: config, // TODO: Add metadataStore? or not? - logger: logger.Named("planner"), // TODO: Is this 'planner'? - logPersister: logPersister, + metadataStore: metadataStore, + logger: logger.Named("planner"), // TODO: Is this 'planner'? + logPersister: logPersister, } } diff --git a/pkg/app/pipedv1/plugin/wait/server.go b/pkg/app/pipedv1/plugin/wait/server.go index 3dd57649d1..956f59834b 100644 --- a/pkg/app/pipedv1/plugin/wait/server.go +++ b/pkg/app/pipedv1/plugin/wait/server.go @@ -125,6 +125,7 @@ func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) { var ( service = execute.NewDeploymentService( cfg, + nil, // TODO: Pass metadataStore input.Logger, persister, ) From df6bced1a0c7749145f2b884be8a9d07c666276b Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Tue, 24 Dec 2024 10:14:50 +0900 Subject: [PATCH 6/7] Concentrate on Init and server funcs Signed-off-by: t-kikuc --- .../plugin/wait/config/stage_options.go | 46 ---------- .../plugin/wait/config/stage_options_test.go | 82 ------------------ .../pipedv1/plugin/wait/execute/options.go | 31 ------- pkg/app/pipedv1/plugin/wait/execute/server.go | 15 ++-- pkg/app/pipedv1/plugin/wait/execute/wait.go | 86 +------------------ pkg/app/pipedv1/plugin/wait/server.go | 1 - 6 files changed, 8 insertions(+), 253 deletions(-) delete mode 100644 pkg/app/pipedv1/plugin/wait/config/stage_options.go delete mode 100644 pkg/app/pipedv1/plugin/wait/config/stage_options_test.go delete mode 100644 pkg/app/pipedv1/plugin/wait/execute/options.go diff --git a/pkg/app/pipedv1/plugin/wait/config/stage_options.go b/pkg/app/pipedv1/plugin/wait/config/stage_options.go deleted file mode 100644 index 2632129e4d..0000000000 --- a/pkg/app/pipedv1/plugin/wait/config/stage_options.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "encoding/json" - - "github.com/creasty/defaults" - "sigs.k8s.io/yaml" -) - -type StageOptions interface { - Validate() error -} - -// DecodeYAML unmarshals stageOptions YAML data to specified StageOptions type and validates the result. -func DecodeStageOptionsYAML[T StageOptions](data []byte) (*T, error) { - js, err := yaml.YAMLToJSON(data) - if err != nil { - return nil, err - } - - var o T - if err := json.Unmarshal(js, &o); err != nil { - return nil, err - } - if err := defaults.Set(&o); err != nil { - return nil, err - } - if err := o.Validate(); err != nil { - return nil, err - } - return &o, nil -} diff --git a/pkg/app/pipedv1/plugin/wait/config/stage_options_test.go b/pkg/app/pipedv1/plugin/wait/config/stage_options_test.go deleted file mode 100644 index 65ac905d2c..0000000000 --- a/pkg/app/pipedv1/plugin/wait/config/stage_options_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -type testStageOptions struct { - Number int `json:"number"` - Text string `json:"text"` -} - -func (t testStageOptions) Validate() error { - if t.Number > 0 { - return nil - } else { - return errors.New("number must be greater than 0") - } -} - -func TestDecodeStageOptionsYAML(t *testing.T) { - t.Parallel() - - testcases := []struct { - title string - data []byte - expected testStageOptions - expectErr bool - }{ - { - title: "valid data", - data: []byte(` -number: 123 -text: "test" -`), - expected: testStageOptions{ - Number: 123, - Text: "test", - }, - expectErr: false, - }, - { - title: "invalid format", - data: []byte(`invalid`), - expected: testStageOptions{}, - expectErr: true, - }, - { - title: "validation failed", - data: []byte(`number: -1`), - expected: testStageOptions{}, - expectErr: true, - }, - } - - for _, tc := range testcases { - t.Run(tc.title, func(t *testing.T) { - t.Parallel() - got, err := DecodeStageOptionsYAML[testStageOptions](tc.data) - assert.Equal(t, tc.expectErr, err != nil) - if err == nil { - assert.Equal(t, tc.expected, *got) - } - }) - } -} diff --git a/pkg/app/pipedv1/plugin/wait/execute/options.go b/pkg/app/pipedv1/plugin/wait/execute/options.go deleted file mode 100644 index 9c50cd3073..0000000000 --- a/pkg/app/pipedv1/plugin/wait/execute/options.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2024 The PipeCD Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package execute - -import ( - config "github.com/pipe-cd/pipecd/pkg/configv1" -) - -// waitStageOptions contains all configurable values for a WAIT stage. -type waitStageOptions struct { - Duration config.Duration `json:"duration"` - // TODO: Handle SkipOn options. - // SkipOn config.SkipOptions `json:"skipOn,omitempty"` -} - -func (o waitStageOptions) Validate() error { - // TODO: Implement validation logic. - return nil -} diff --git a/pkg/app/pipedv1/plugin/wait/execute/server.go b/pkg/app/pipedv1/plugin/wait/execute/server.go index 4541571743..59862e7ffa 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/server.go +++ b/pkg/app/pipedv1/plugin/wait/execute/server.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" "github.com/pipe-cd/pipecd/pkg/plugin/logpersister" @@ -33,28 +32,24 @@ type deploymentServiceServer struct { pluginConfig *config.PipedPlugin - metadataStore metadatastore.MetadataStore - logger *zap.Logger - logPersister logPersister + logger *zap.Logger + logPersister logPersister } type logPersister interface { StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister } -// NewDeploymentService creates a new planService. +// NewDeploymentService creates a new deploymentServiceServer of Wait Stage plugin. func NewDeploymentService( config *config.PipedPlugin, - metadataStore metadatastore.MetadataStore, logger *zap.Logger, logPersister logPersister, ) *deploymentServiceServer { return &deploymentServiceServer{ pluginConfig: config, - // TODO: Add metadataStore? or not? - metadataStore: metadataStore, - logger: logger.Named("planner"), // TODO: Is this 'planner'? - logPersister: logPersister, + logger: logger.Named("wait-stage-plugin"), + logPersister: logPersister, } } diff --git a/pkg/app/pipedv1/plugin/wait/execute/wait.go b/pkg/app/pipedv1/plugin/wait/execute/wait.go index 5361bd2792..e1f59c4b70 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/wait.go +++ b/pkg/app/pipedv1/plugin/wait/execute/wait.go @@ -16,13 +16,8 @@ package execute import ( "context" - "strconv" - "time" - - "go.uber.org/zap" "github.com/pipe-cd/pipecd/pkg/app/piped/logpersister" - "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/config" "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" ) @@ -30,86 +25,11 @@ import ( type Stage string const ( - defaultDuration = time.Minute - logInterval = 10 * time.Second - startTimeKey = "startTime" - stageWait Stage = "WAIT" + stageWait Stage = "WAIT" ) // Execute starts waiting for the specified duration. func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) model.StageStatus { - var ( - // originalStatus = in.Stage.Status - duration = defaultDuration - ) - - opts, err := config.DecodeStageOptionsYAML[waitStageOptions](in.StageConfig) - if err != nil { - slp.Errorf("failed to decode the stage configuration: %v", err) - return model.StageStatus_STAGE_FAILURE - } - - // Apply the stage configurations. - if opts != nil && opts.Duration > 0 { - duration = opts.Duration.Duration() - } - totalDuration := duration - - // Retrieve the saved startTime from the previous run. - startTime := s.retrieveStartTime(in.Stage.Id) - if !startTime.IsZero() { - duration -= time.Since(startTime) - if duration < 0 { - duration = 0 - } - } else { - startTime = time.Now() - } - defer s.saveStartTime(ctx, startTime, in.Stage.Id) - - return s.wait(ctx, duration, totalDuration, startTime, slp) -} - -func (s *deploymentServiceServer) wait(ctx context.Context, duration, totalDuration time.Duration, startTime time.Time, slp logpersister.StageLogPersister) model.StageStatus { - timer := time.NewTimer(duration) - defer timer.Stop() - - ticker := time.NewTicker(logInterval) - defer ticker.Stop() - - slp.Infof("Waiting for %v...", duration) - for { - select { - case <-timer.C: // on completed - slp.Infof("Waited for %v", totalDuration) - return model.StageStatus_STAGE_SUCCESS - - case <-ticker.C: // on interval elapsed - slp.Infof("%v elapsed...", time.Since(startTime)) - - case <-ctx.Done(): // on cancelled or terminated - return model.StageStatus_STAGE_CANCELLED // TODO: Is it correct when terminated? - } - } -} - -func (s *deploymentServiceServer) retrieveStartTime(stageId string) (t time.Time) { - sec, ok := s.metadataStore.Stage(stageId).Get(startTimeKey) - if !ok { - return - } - ut, err := strconv.ParseInt(sec, 10, 64) - if err != nil { - return - } - return time.Unix(ut, 0) -} - -func (s *deploymentServiceServer) saveStartTime(ctx context.Context, t time.Time, stageId string) { - metadata := map[string]string{ - startTimeKey: strconv.FormatInt(t.Unix(), 10), - } - if err := s.metadataStore.Stage(stageId).PutMulti(ctx, metadata); err != nil { - s.logger.Error("failed to store metadata", zap.Error(err)) - } + // TOD: implement the logic of waiting + return model.StageStatus_STAGE_FAILURE } diff --git a/pkg/app/pipedv1/plugin/wait/server.go b/pkg/app/pipedv1/plugin/wait/server.go index 956f59834b..3dd57649d1 100644 --- a/pkg/app/pipedv1/plugin/wait/server.go +++ b/pkg/app/pipedv1/plugin/wait/server.go @@ -125,7 +125,6 @@ func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) { var ( service = execute.NewDeploymentService( cfg, - nil, // TODO: Pass metadataStore input.Logger, persister, ) From ce4ebfbeede53a6651f70ab01123ebaa10ed67bf Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Tue, 24 Dec 2024 10:39:34 +0900 Subject: [PATCH 7/7] fix golangci errors Signed-off-by: t-kikuc --- pkg/app/pipedv1/plugin/wait/execute/server.go | 4 ++-- pkg/app/pipedv1/plugin/wait/server.go | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/app/pipedv1/plugin/wait/execute/server.go b/pkg/app/pipedv1/plugin/wait/execute/server.go index 59862e7ffa..ecbea7d920 100644 --- a/pkg/app/pipedv1/plugin/wait/execute/server.go +++ b/pkg/app/pipedv1/plugin/wait/execute/server.go @@ -54,8 +54,8 @@ func NewDeploymentService( } // Register registers all handling of this service into the specified gRPC server. -func (a *deploymentServiceServer) Register(server *grpc.Server) { - deployment.RegisterDeploymentServiceServer(server, a) +func (s *deploymentServiceServer) Register(server *grpc.Server) { + deployment.RegisterDeploymentServiceServer(server, s) } // ExecuteStage implements deployment.ExecuteStage. diff --git a/pkg/app/pipedv1/plugin/wait/server.go b/pkg/app/pipedv1/plugin/wait/server.go index 3dd57649d1..ffc932f8bb 100644 --- a/pkg/app/pipedv1/plugin/wait/server.go +++ b/pkg/app/pipedv1/plugin/wait/server.go @@ -20,6 +20,10 @@ import ( "net/http/pprof" "time" + "github.com/spf13/cobra" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "github.com/pipe-cd/pipecd/pkg/admin" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/execute" "github.com/pipe-cd/pipecd/pkg/cli" @@ -28,9 +32,6 @@ import ( "github.com/pipe-cd/pipecd/pkg/plugin/pipedapi" "github.com/pipe-cd/pipecd/pkg/rpc" "github.com/pipe-cd/pipecd/pkg/version" - "github.com/spf13/cobra" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) type plugin struct {