Skip to content

Commit

Permalink
feat: Support running multiple API server instances with the same fil…
Browse files Browse the repository at this point in the history
…e storage (#93)
  • Loading branch information
CH3CHO authored Jul 4, 2024
1 parent ae7ef2e commit 364e5a1
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 11 deletions.
3 changes: 0 additions & 3 deletions compose/env/console.env
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
JAVA_HOME=/usr/local/openjdk-18
LANG=C.UTF-8
HIGRESS_CONSOLE_KUBE_CONFIG=/home/higress/.kube/config
HIGRESS_CONSOLE_ADMIN_SECRET=higress-console
HIGRESS_CONSOLE_CONFIG_MAP_NAME=higress-console
HIGRESS_CONSOLE_CONTROLLER_INGRESS_CLASS_NAME=higress
HIGRESS_CONSOLE_CONTROLLER_SERVICE_HOST=pilot
HIGRESS_CONSOLE_CONTROLLER_ACCESS_TOKEN=
82 changes: 74 additions & 8 deletions src/apiserver/pkg/registry/file_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -32,8 +33,10 @@ import (
)

const fileChangeProcessInterval = 100 * time.Millisecond
const fileChangeProcessDelay = 250 * time.Millisecond
const defaultNamespace = "higress-system"
const tmpFileTtl = 5 * time.Second

var fileBeingProcessedError = errors.New("file is being processed")

var _ rest.StandardStorage = &fileREST{}
var _ rest.Scoper = &fileREST{}
Expand All @@ -53,11 +56,9 @@ func NewFileREST(
) (REST, error) {
if attrFunc == nil {
if isNamespaced {
if isNamespaced {
attrFunc = storage.DefaultNamespaceScopedAttr
} else {
attrFunc = storage.DefaultClusterScopedAttr
}
attrFunc = storage.DefaultNamespaceScopedAttr
} else {
attrFunc = storage.DefaultClusterScopedAttr
}
}
watcher, err := fsnotify.NewWatcher()
Expand Down Expand Up @@ -154,7 +155,7 @@ func (f *fileREST) startDirWatcher() error {
if !ok {
return
}
klog.Errorf("error received from watcher: ", err)
klog.Errorf("error received from watcher: %v", err)
}
}
}()
Expand Down Expand Up @@ -346,6 +347,9 @@ func (f *fileREST) Create(
panic(fmt.Sprintf("unable to create data dir: %s", err))
}
if err := f.write(f.codec, filename, obj); err != nil {
if errors.Is(err, fileBeingProcessedError) {
return nil, apierrors.NewConflict(f.groupResource, name, err)
}
return nil, apierrors.NewInternalError(err)
}

Expand Down Expand Up @@ -402,6 +406,9 @@ func (f *fileREST) Update(
updatedAccessor.SetResourceVersion("1")

if err := f.write(f.codec, filename, updatedObj); err != nil {
if errors.Is(err, fileBeingProcessedError) {
return nil, false, apierrors.NewConflict(f.groupResource, name, err)
}
return nil, false, apierrors.NewInternalError(err)
}
f.notifyWatchers(watch.Event{
Expand Down Expand Up @@ -441,6 +448,9 @@ func (f *fileREST) Update(
updatedAccessor.SetResourceVersion(strconv.FormatUint(newResourceVersion, 10))

if err := f.write(f.codec, filename, updatedObj); err != nil {
if errors.Is(err, fileBeingProcessedError) {
return nil, false, apierrors.NewConflict(f.groupResource, name, err)
}
return nil, false, apierrors.NewInternalError(err)
}

Expand Down Expand Up @@ -517,7 +527,63 @@ func (f *fileREST) write(encoder runtime.Encoder, filepath string, obj runtime.O
if err := encoder.Encode(obj, buf); err != nil {
return err
}
return os.WriteFile(filepath, buf.Bytes(), 0600)

tmpFilepath := filepath + ".tmp"
tmpFileWriteRetried := false
for {
writeErr := f.writeTempFile(buf, tmpFilepath)
if writeErr == nil {
break
}
if !errors.Is(writeErr, fileBeingProcessedError) {
klog.Errorf("failed to write temp file [%s]: %v", tmpFilepath, writeErr)
return writeErr
}
klog.Warningf("temp file already exists: %s", tmpFilepath)
tmpFileInfo, statErr := os.Stat(tmpFilepath)
if statErr != nil {
klog.Errorf("failed to stat temp file [%s]: %v", tmpFilepath, statErr)
return writeErr
}
if !tmpFileInfo.ModTime().Add(tmpFileTtl).Before(time.Now()) {
klog.Infof("temp file [%s] mod time: %v. still within TTL %dms. leave it there.", tmpFilepath,
tmpFileInfo.ModTime().Format(time.RFC3339), tmpFileTtl.Milliseconds())
return writeErr
}
klog.Infof("temp file [%s] mod time: %v. already beyond TTL %dms. delete it.", tmpFilepath,
tmpFileInfo.ModTime().Format(time.RFC3339), tmpFileTtl.Milliseconds())
if removeErr := os.Remove(tmpFilepath); removeErr != nil {
klog.Errorf("failed to remove temp file [%s]: %v", tmpFilepath, removeErr)
return writeErr
}
if tmpFileWriteRetried {
return writeErr
}
tmpFileWriteRetried = true
}
if err := os.Rename(tmpFilepath, filepath); err != nil {
_ = os.Remove(tmpFilepath)
return err
}
return nil
}

func (f *fileREST) writeTempFile(buf *bytes.Buffer, tmpFilepath string) error {
tmpFile, err := os.OpenFile(tmpFilepath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0755)
if err != nil {
pathError := &fs.PathError{}
if ok := errors.As(err, &pathError); ok && errors.Is(pathError.Err, fs.ErrExist) {
return fileBeingProcessedError
}
return err
}

defer func(f *os.File) {
_ = f.Close()
}(tmpFile)

_, err = buf.WriteTo(tmpFile)
return err
}

func (f *fileREST) read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error) {
Expand Down

0 comments on commit 364e5a1

Please sign in to comment.