From 1248ca1dbfea6c7f5da42f7b2bfb2813ec1053f9 Mon Sep 17 00:00:00 2001 From: astraw99 Date: Fri, 6 Oct 2023 16:52:15 +0800 Subject: [PATCH] Add signal catch to stop the server gracefully --- .gitignore | 1 + cmd/hostpathplugin/main.go | 15 ++++++++++++--- internal/endpoint/endpoint.go | 1 - pkg/hostpath/README.md | 2 +- pkg/hostpath/hostpath.go | 6 ++++-- pkg/hostpath/nodeserver.go | 2 +- pkg/hostpath/server.go | 9 --------- 7 files changed, 19 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 5e56e040e..0099b8346 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /bin +.idea diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index 459d2a9d5..825f5cc88 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -44,7 +44,7 @@ func main() { VendorVersion: version, } - flag.StringVar(&cfg.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint") + flag.StringVar(&cfg.Endpoint, "endpoint", "unix:///tmp/csi.sock", "CSI endpoint") flag.StringVar(&cfg.DriverName, "drivername", "hostpath.csi.k8s.io", "name of the driver") flag.StringVar(&cfg.StateDir, "statedir", "/csi-data-dir", "directory for storing state information across driver restarts, volumes and snapshots") flag.StringVar(&cfg.NodeID, "nodeid", "", "node id") @@ -113,9 +113,18 @@ func main() { os.Exit(1) } - if err := driver.Run(); err != nil { + // Wait for signal + stopCh := make(chan os.Signal) + sigs := []os.Signal{ + syscall.SIGTERM, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGQUIT, + } + signal.Notify(stopCh, sigs...) + + if err := driver.Run(stopCh); err != nil { fmt.Printf("Failed to run driver: %s", err.Error()) os.Exit(1) - } } diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 4f85b5ba3..c856cb531 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -43,7 +43,6 @@ func Listen(endpoint string) (net.Listener, func(), error) { cleanup := func() {} if proto == "unix" { - addr = "/" + addr if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow return nil, nil, fmt.Errorf("%s: %q", addr, err) } diff --git a/pkg/hostpath/README.md b/pkg/hostpath/README.md index 612bd5cf3..449f920bd 100644 --- a/pkg/hostpath/README.md +++ b/pkg/hostpath/README.md @@ -18,7 +18,7 @@ Get ```csc``` tool from https://github.com/rexray/gocsi/tree/master/csc #### Get plugin info ``` $ csc identity plugin-info --endpoint tcp://127.0.0.1:10000 -"csi-hostpath" "0.1.0" +"hostpath.csi.k8s.io" "v1.11.0-73-g3348ad07" ``` #### Create a volume diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 6958e9631..5d7fd50f0 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -120,11 +120,13 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) { return hp, nil } -func (hp *hostPath) Run() error { +func (hp *hostPath) Run(stopCh <-chan os.Signal) error { s := NewNonBlockingGRPCServer() // hp itself implements ControllerServer, NodeServer, and IdentityServer. s.Start(hp.config.Endpoint, hp, hp, hp) - s.Wait() + + <-stopCh + s.Stop() return nil } diff --git a/pkg/hostpath/nodeserver.go b/pkg/hostpath/nodeserver.go index c8f27fb25..e51d7821a 100644 --- a/pkg/hostpath/nodeserver.go +++ b/pkg/hostpath/nodeserver.go @@ -503,7 +503,7 @@ func (hp *hostPath) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVol switch m := info.Mode(); { case m.IsDir(): if vol.VolAccessType != state.MountAccess { - return nil, status.Errorf(codes.InvalidArgument, "Volume %s is not a directory", volID) + return nil, status.Errorf(codes.InvalidArgument, "Volume %s is not a mounted filesystem", volID) } case m&os.ModeDevice != 0: if vol.VolAccessType != state.BlockAccess { diff --git a/pkg/hostpath/server.go b/pkg/hostpath/server.go index 8687fe191..b55ef06be 100644 --- a/pkg/hostpath/server.go +++ b/pkg/hostpath/server.go @@ -18,7 +18,6 @@ package hostpath import ( "encoding/json" - "sync" "github.com/golang/glog" "golang.org/x/net/context" @@ -35,24 +34,17 @@ func NewNonBlockingGRPCServer() *nonBlockingGRPCServer { // NonBlocking server type nonBlockingGRPCServer struct { - wg sync.WaitGroup server *grpc.Server cleanup func() } func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { - s.wg.Add(1) - go s.serve(endpoint, ids, cs, ns) return } -func (s *nonBlockingGRPCServer) Wait() { - s.wg.Wait() -} - func (s *nonBlockingGRPCServer) Stop() { s.server.GracefulStop() s.cleanup() @@ -89,7 +81,6 @@ func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi. glog.Infof("Listening for connections on address: %#v", listener.Addr()) server.Serve(listener) - } func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {