Skip to content

Commit

Permalink
Merge pull request #486 from oasisprotocol/ptrus/feature/pogreb-reind…
Browse files Browse the repository at this point in the history
…ex-background

pogreb: non-blocking recovery
  • Loading branch information
ptrus authored Aug 3, 2023
2 parents ce3b751 + 27bed91 commit 803ca11
Showing 1 changed file with 79 additions and 8 deletions.
87 changes: 79 additions & 8 deletions storage/oasis/nodeapi/file/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package file

import (
"fmt"
"sync/atomic"
"time"

"github.com/akrylysov/pogreb"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
Expand All @@ -27,28 +29,97 @@ type KVStore interface {
}

type pogrebKVStore struct {
*pogreb.DB
db *pogreb.DB

path string
logger *log.Logger

// Address of the atomic variable that indicates whether the store is initialized.
// Synchronisation is required because the store is opened in background goroutine.
initialized uint32
}

var _ KVStore = (*pogrebKVStore)(nil)

func (s *pogrebKVStore) isInitialized() bool {
return atomic.LoadUint32(&s.initialized) == 1
}

// Get implements KVStore.
func (s *pogrebKVStore) Get(key []byte) ([]byte, error) {
if b := s.isInitialized(); !b {
return nil, fmt.Errorf("kvstore: not initialized yet")
}
return s.db.Get(key)
}

// Has implements KVStore.
func (s *pogrebKVStore) Has(key []byte) (bool, error) {
if b := s.isInitialized(); !b {
return false, nil
}
return s.db.Has(key)
}

// Put implements KVStore.
func (s *pogrebKVStore) Put(key []byte, value []byte) error {
if b := s.isInitialized(); !b {
// If the store is not initialized yet, skip writing to it.
s.logger.Debug("skipping write to uninitialized KVStore", "key", key)
return nil
}
return s.db.Put(key, value)
}

// Close implements KVStore.
func (s pogrebKVStore) Close() error {
s.logger.Info("closing KVStore", "path", s.path)
return s.DB.Close()
return s.db.Close()
}

func OpenKVStore(logger *log.Logger, path string) (KVStore, error) {
logger.Info("(re)opening KVStore", "path", path)
db, err := pogreb.Open(path, &pogreb.Options{BackgroundSyncInterval: -1})
func (s *pogrebKVStore) init() error {
s.logger.Info("(re)opening KVStore", "path", s.path)
db, err := pogreb.Open(s.path, &pogreb.Options{BackgroundSyncInterval: -1})
if err != nil {
return nil, err
s.logger.Error("failed to initialize pogreb store", "err", err)
return err
}

s.db = db
atomic.StoreUint32(&s.initialized, 1)
s.logger.Info(fmt.Sprintf("KVStore has %d entries", db.Count()))
return nil
}

func OpenKVStore(logger *log.Logger, path string) (KVStore, error) {
store := &pogrebKVStore{
logger: logger,
path: path,
}
logger.Info(fmt.Sprintf("KVStore has %d entries", db.Count()))

return &pogrebKVStore{DB: db, logger: logger, path: path}, nil
// Open the database in background as it is possible it will do a full-reindex on startup after a crash:
// https://github.com/akrylysov/pogreb/issues/35
initErrCh := make(chan error)
go func() {
initErrCh <- store.init()
}()

select {
case err := <-initErrCh:
// Database initialized in time.
if err != nil {
return nil, err
}
return store, nil
case <-time.After(30 * time.Second):
// Database is likely doing a full-reindex after a crash which can take a long time (multiple hours).
// Continue without cache while the database is reindexing in the background. Once it's done,
// the cache will be used.
// NOTE: A failure during the database reindex will be ignored (but logged), therefore in that scenario
// the cache will never be initialized.
logger.Warn("KVStore initialization timed out, continuing without cache while the database is reindexing in the background")
return store, nil
}
}

// Pretty() returns a pretty-printed, human-readable version of the cache key.
Expand Down

0 comments on commit 803ca11

Please sign in to comment.