Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init grpc server of wait stage plugin #5445

Merged
merged 7 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions pkg/app/pipedv1/plugin/wait/execute/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package execute

import (
"context"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"

config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/signalhandler"
)

type deploymentServiceServer struct {
deployment.UnimplementedDeploymentServiceServer

pluginConfig *config.PipedPlugin

logger *zap.Logger
logPersister logPersister
}

type logPersister interface {
StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister
}

// NewDeploymentService creates a new deploymentServiceServer of Wait Stage plugin.
func NewDeploymentService(
config *config.PipedPlugin,
logger *zap.Logger,
logPersister logPersister,
) *deploymentServiceServer {
return &deploymentServiceServer{
pluginConfig: config,
logger: logger.Named("wait-stage-plugin"),
logPersister: logPersister,
}

Check warning on line 53 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L48-L53

Added lines #L48 - L53 were not covered by tests
}

// Register registers all handling of this service into the specified gRPC server.
func (s *deploymentServiceServer) Register(server *grpc.Server) {
deployment.RegisterDeploymentServiceServer(server, s)

Check warning on line 58 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

// ExecuteStage implements deployment.ExecuteStage.
func (s *deploymentServiceServer) ExecuteStage(ctx context.Context, request *deployment.ExecuteStageRequest) (response *deployment.ExecuteStageResponse, err error) {
slp := s.logPersister.StageLogPersister(request.Input.GetDeployment().GetId(), request.Input.GetStage().GetId())
defer func() {
// When termination signal received and the stage is not completed yet, we should not mark the log persister as completed.
// This can occur when the piped is shutting down while the stage is still running.
if !response.GetStatus().IsCompleted() && signalhandler.Terminated() {
return
}
slp.Complete(time.Minute)

Check warning on line 70 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L62-L70

Added lines #L62 - L70 were not covered by tests
}()

status := s.execute(ctx, request.Input, slp)
return &deployment.ExecuteStageResponse{
Status: status,
}, nil

Check warning on line 76 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L73-L76

Added lines #L73 - L76 were not covered by tests
}

// FetchDefinedStages implements deployment.FetchDefinedStages.
func (s *deploymentServiceServer) FetchDefinedStages(ctx context.Context, request *deployment.FetchDefinedStagesRequest) (*deployment.FetchDefinedStagesResponse, error) {
return &deployment.FetchDefinedStagesResponse{
Stages: []string{string(stageWait)},
}, nil

Check warning on line 83 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L80-L83

Added lines #L80 - L83 were not covered by tests
}

// DetermineVersions implements deployment.DeploymentServiceServer.
func (s *deploymentServiceServer) DetermineVersions(ctx context.Context, request *deployment.DetermineVersionsRequest) (*deployment.DetermineVersionsResponse, error) {
// TODO: Implement this func
return &deployment.DetermineVersionsResponse{}, nil

Check warning on line 89 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L87-L89

Added lines #L87 - L89 were not covered by tests
}

// DetermineStrategy implements deployment.DeploymentServiceServer.
func (s *deploymentServiceServer) DetermineStrategy(ctx context.Context, request *deployment.DetermineStrategyRequest) (*deployment.DetermineStrategyResponse, error) {
// TODO: Implement this func
return &deployment.DetermineStrategyResponse{}, nil

Check warning on line 95 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L93-L95

Added lines #L93 - L95 were not covered by tests
}

// BuildPipelineSyncStages implements deployment.BuildPipelineSyncStages.
func (s *deploymentServiceServer) BuildPipelineSyncStages(ctx context.Context, request *deployment.BuildPipelineSyncStagesRequest) (*deployment.BuildPipelineSyncStagesResponse, error) {
// TODO: Implement this func
return &deployment.BuildPipelineSyncStagesResponse{}, nil

Check warning on line 101 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L99-L101

Added lines #L99 - L101 were not covered by tests
}

// BuildQuickSyncStages implements deployment.BuildQuickSyncStages.
func (s *deploymentServiceServer) BuildQuickSyncStages(ctx context.Context, request *deployment.BuildQuickSyncStagesRequest) (*deployment.BuildQuickSyncStagesResponse, error) {
// TODO: Implement this func
return &deployment.BuildQuickSyncStagesResponse{}, nil

Check warning on line 107 in pkg/app/pipedv1/plugin/wait/execute/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/server.go#L105-L107

Added lines #L105 - L107 were not covered by tests
}
35 changes: 35 additions & 0 deletions pkg/app/pipedv1/plugin/wait/execute/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package execute

import (
"context"

"github.com/pipe-cd/pipecd/pkg/app/piped/logpersister"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type Stage string

const (
stageWait Stage = "WAIT"
)

// Execute starts waiting for the specified duration.
func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) model.StageStatus {
// TOD: implement the logic of waiting
return model.StageStatus_STAGE_FAILURE

Check warning on line 34 in pkg/app/pipedv1/plugin/wait/execute/wait.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/execute/wait.go#L32-L34

Added lines #L32 - L34 were not covered by tests
}
34 changes: 34 additions & 0 deletions pkg/app/pipedv1/plugin/wait/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"log"

"github.com/pipe-cd/pipecd/pkg/cli"
)

func main() {
app := cli.NewApp(
"pipecd-plugin-stage-wait",
"Plugin component to execute Wait Stage.",
)
app.AddCommands(
newPluginCommand(),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}

Check warning on line 33 in pkg/app/pipedv1/plugin/wait/main.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/main.go#L23-L33

Added lines #L23 - L33 were not covered by tests
}
162 changes: 162 additions & 0 deletions pkg/app/pipedv1/plugin/wait/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"net/http"
"net/http/pprof"
"time"

"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pipe-cd/pipecd/pkg/admin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/execute"
"github.com/pipe-cd/pipecd/pkg/cli"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedapi"
"github.com/pipe-cd/pipecd/pkg/rpc"
"github.com/pipe-cd/pipecd/pkg/version"
)

type plugin struct {
pipedPluginService string
gracePeriod time.Duration
tls bool
certFile string
keyFile string
config string

enableGRPCReflection bool
}

// newPluginCommand creates a new cobra command for executing api server.
func newPluginCommand() *cobra.Command {
s := &plugin{
gracePeriod: 30 * time.Second,
}
cmd := &cobra.Command{
Use: "start",
Short: "Start running the wait-stage-plugin.",
RunE: cli.WithContext(s.run),
}

cmd.Flags().StringVar(&s.pipedPluginService, "piped-plugin-service", s.pipedPluginService, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead.
cmd.Flags().StringVar(&s.config, "config", s.config, "The configuration for the plugin.")
cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.")

cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.")
cmd.Flags().StringVar(&s.certFile, "cert-file", s.certFile, "The path to the TLS certificate file.")
cmd.Flags().StringVar(&s.keyFile, "key-file", s.keyFile, "The path to the TLS key file.")

// For debugging early in development
cmd.Flags().BoolVar(&s.enableGRPCReflection, "enable-grpc-reflection", s.enableGRPCReflection, "Whether to enable the reflection service or not.")

cmd.MarkFlagRequired("piped-plugin-service")
cmd.MarkFlagRequired("config")

return cmd

Check warning on line 73 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L49-L73

Added lines #L49 - L73 were not covered by tests
}

func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) {
// Make a cancellable context.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

group, ctx := errgroup.WithContext(ctx)

pipedapiClient, err := pipedapi.NewClient(ctx, s.pipedPluginService)
if err != nil {
input.Logger.Error("failed to create piped plugin service client", zap.Error(err))
return err
}

Check warning on line 87 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L76-L87

Added lines #L76 - L87 were not covered by tests

// Load the configuration.
cfg, err := config.ParsePluginConfig(s.config)
if err != nil {
input.Logger.Error("failed to parse the configuration", zap.Error(err))
return err
}

Check warning on line 94 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L90-L94

Added lines #L90 - L94 were not covered by tests

// Start running admin server.
{
var (
ver = []byte(version.Get().Version) // TODO: get the plugin's version
admin = admin.NewAdmin(0, s.gracePeriod, input.Logger) // TODO: add config for admin port
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Write(ver)
})
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.HandleFunc("/debug/pprof/", pprof.Index)
admin.HandleFunc("/debug/pprof/profile", pprof.Profile)
admin.HandleFunc("/debug/pprof/trace", pprof.Trace)

group.Go(func() error {
return admin.Run(ctx)
})

Check warning on line 115 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L97-L115

Added lines #L97 - L115 were not covered by tests
}

// Start log persister
persister := logpersister.NewPersister(pipedapiClient, input.Logger)
group.Go(func() error {
return persister.Run(ctx)
})

Check warning on line 122 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L119-L122

Added lines #L119 - L122 were not covered by tests

// Start a gRPC server for handling external API requests.
{
var (
service = execute.NewDeploymentService(
cfg,
input.Logger,
persister,
)
opts = []rpc.Option{
rpc.WithPort(cfg.Port),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
rpc.WithSignalHandlingUnaryInterceptor(),
}
)
if s.tls {
opts = append(opts, rpc.WithTLS(s.certFile, s.keyFile))
}
if s.enableGRPCReflection {
opts = append(opts, rpc.WithGRPCReflection())
}
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Check warning on line 149 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L125-L149

Added lines #L125 - L149 were not covered by tests

server := rpc.NewServer(service, opts...)
group.Go(func() error {
return server.Run(ctx)
})

Check warning on line 154 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L151-L154

Added lines #L151 - L154 were not covered by tests
}

if err := group.Wait(); err != nil {
input.Logger.Error("failed while running", zap.Error(err))
return err
}
return nil

Check warning on line 161 in pkg/app/pipedv1/plugin/wait/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/wait/server.go#L157-L161

Added lines #L157 - L161 were not covered by tests
}
Loading