From f25d38dc258cf0edaedb84487b838a18308b5665 Mon Sep 17 00:00:00 2001 From: CH3CHO Date: Sun, 23 Jun 2024 16:55:48 +0800 Subject: [PATCH] feat: Support running multiple API server instances with the same file storage --- compose/env/console.env | 3 - src/apiserver/pkg/registry/file_rest.go | 82 ++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/compose/env/console.env b/compose/env/console.env index 01e69f4..9d6345d 100644 --- a/compose/env/console.env +++ b/compose/env/console.env @@ -1,8 +1,5 @@ JAVA_HOME=/usr/local/openjdk-18 LANG=C.UTF-8 HIGRESS_CONSOLE_KUBE_CONFIG=/home/higress/.kube/config -HIGRESS_CONSOLE_ADMIN_SECRET=higress-console -HIGRESS_CONSOLE_CONFIG_MAP_NAME=higress-console -HIGRESS_CONSOLE_CONTROLLER_INGRESS_CLASS_NAME=higress HIGRESS_CONSOLE_CONTROLLER_SERVICE_HOST=pilot HIGRESS_CONSOLE_CONTROLLER_ACCESS_TOKEN= \ No newline at end of file diff --git a/src/apiserver/pkg/registry/file_rest.go b/src/apiserver/pkg/registry/file_rest.go index 5b0414a..524a8a9 100644 --- a/src/apiserver/pkg/registry/file_rest.go +++ b/src/apiserver/pkg/registry/file_rest.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io/fs" "os" "path/filepath" "reflect" @@ -32,8 +33,10 @@ import ( ) const fileChangeProcessInterval = 100 * time.Millisecond -const fileChangeProcessDelay = 250 * time.Millisecond const defaultNamespace = "higress-system" +const tmpFileTtl = 5 * time.Second + +var fileBeingProcessedError = errors.New("file is being processed") var _ rest.StandardStorage = &fileREST{} var _ rest.Scoper = &fileREST{} @@ -53,11 +56,9 @@ func NewFileREST( ) (REST, error) { if attrFunc == nil { if isNamespaced { - if isNamespaced { - attrFunc = storage.DefaultNamespaceScopedAttr - } else { - attrFunc = storage.DefaultClusterScopedAttr - } + attrFunc = storage.DefaultNamespaceScopedAttr + } else { + attrFunc = storage.DefaultClusterScopedAttr } } watcher, err := fsnotify.NewWatcher() @@ -154,7 +155,7 @@ func (f *fileREST) startDirWatcher() error { if !ok { return } - klog.Errorf("error received from watcher: ", err) + klog.Errorf("error received from watcher: %v", err) } } }() @@ -346,6 +347,9 @@ func (f *fileREST) Create( panic(fmt.Sprintf("unable to create data dir: %s", err)) } if err := f.write(f.codec, filename, obj); err != nil { + if errors.Is(err, fileBeingProcessedError) { + return nil, apierrors.NewConflict(f.groupResource, name, err) + } return nil, apierrors.NewInternalError(err) } @@ -402,6 +406,9 @@ func (f *fileREST) Update( updatedAccessor.SetResourceVersion("1") if err := f.write(f.codec, filename, updatedObj); err != nil { + if errors.Is(err, fileBeingProcessedError) { + return nil, false, apierrors.NewConflict(f.groupResource, name, err) + } return nil, false, apierrors.NewInternalError(err) } f.notifyWatchers(watch.Event{ @@ -441,6 +448,9 @@ func (f *fileREST) Update( updatedAccessor.SetResourceVersion(strconv.FormatUint(newResourceVersion, 10)) if err := f.write(f.codec, filename, updatedObj); err != nil { + if errors.Is(err, fileBeingProcessedError) { + return nil, false, apierrors.NewConflict(f.groupResource, name, err) + } return nil, false, apierrors.NewInternalError(err) } @@ -517,7 +527,63 @@ func (f *fileREST) write(encoder runtime.Encoder, filepath string, obj runtime.O if err := encoder.Encode(obj, buf); err != nil { return err } - return os.WriteFile(filepath, buf.Bytes(), 0600) + + tmpFilepath := filepath + ".tmp" + tmpFileWriteRetried := false + for { + writeErr := f.writeTempFile(buf, tmpFilepath) + if writeErr == nil { + break + } + if !errors.Is(writeErr, fileBeingProcessedError) { + klog.Errorf("failed to write temp file [%s]: %v", tmpFilepath, writeErr) + return writeErr + } + klog.Warningf("temp file already exists: %s", tmpFilepath) + tmpFileInfo, statErr := os.Stat(tmpFilepath) + if statErr != nil { + klog.Errorf("failed to stat temp file [%s]: %v", tmpFilepath, statErr) + return writeErr + } + if !tmpFileInfo.ModTime().Add(tmpFileTtl).Before(time.Now()) { + klog.Infof("temp file [%s] mod time: %v. still within TTL %dms. leave it there.", tmpFilepath, + tmpFileInfo.ModTime().Format(time.RFC3339), tmpFileTtl.Milliseconds()) + return writeErr + } + klog.Infof("temp file [%s] mod time: %v. already beyond TTL %dms. delete it.", tmpFilepath, + tmpFileInfo.ModTime().Format(time.RFC3339), tmpFileTtl.Milliseconds()) + if removeErr := os.Remove(tmpFilepath); removeErr != nil { + klog.Errorf("failed to remove temp file [%s]: %v", tmpFilepath, removeErr) + return writeErr + } + if tmpFileWriteRetried { + return writeErr + } + tmpFileWriteRetried = true + } + if err := os.Rename(tmpFilepath, filepath); err != nil { + _ = os.Remove(tmpFilepath) + return err + } + return nil +} + +func (f *fileREST) writeTempFile(buf *bytes.Buffer, tmpFilepath string) error { + tmpFile, err := os.OpenFile(tmpFilepath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0755) + if err != nil { + pathError := &fs.PathError{} + if ok := errors.As(err, &pathError); ok && errors.Is(pathError.Err, fs.ErrExist) { + return fileBeingProcessedError + } + return err + } + + defer func(f *os.File) { + _ = f.Close() + }(tmpFile) + + _, err = buf.WriteTo(tmpFile) + return err } func (f *fileREST) read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error) {