diff --git a/storage/oasis/nodeapi/file/kvstore.go b/storage/oasis/nodeapi/file/kvstore.go index 88d1cbf6b..9aa072f43 100644 --- a/storage/oasis/nodeapi/file/kvstore.go +++ b/storage/oasis/nodeapi/file/kvstore.go @@ -2,6 +2,8 @@ package file import ( "fmt" + "sync/atomic" + "time" "github.com/akrylysov/pogreb" "github.com/oasisprotocol/oasis-core/go/common/cbor" @@ -27,28 +29,97 @@ type KVStore interface { } type pogrebKVStore struct { - *pogreb.DB + db *pogreb.DB path string logger *log.Logger + + // Address of the atomic variable that indicates whether the store is initialized. + // Synchronisation is required because the store is opened in background goroutine. + initialized uint32 } var _ KVStore = (*pogrebKVStore)(nil) +func (s *pogrebKVStore) isInitialized() bool { + return atomic.LoadUint32(&s.initialized) == 1 +} + +// Get implements KVStore. +func (s *pogrebKVStore) Get(key []byte) ([]byte, error) { + if b := s.isInitialized(); !b { + return nil, fmt.Errorf("kvstore: not initialized yet") + } + return s.db.Get(key) +} + +// Has implements KVStore. +func (s *pogrebKVStore) Has(key []byte) (bool, error) { + if b := s.isInitialized(); !b { + return false, nil + } + return s.db.Has(key) +} + +// Put implements KVStore. +func (s *pogrebKVStore) Put(key []byte, value []byte) error { + if b := s.isInitialized(); !b { + // If the store is not initialized yet, skip writing to it. + s.logger.Debug("skipping write to uninitialized KVStore", "key", key) + return nil + } + return s.db.Put(key, value) +} + +// Close implements KVStore. func (s pogrebKVStore) Close() error { s.logger.Info("closing KVStore", "path", s.path) - return s.DB.Close() + return s.db.Close() } -func OpenKVStore(logger *log.Logger, path string) (KVStore, error) { - logger.Info("(re)opening KVStore", "path", path) - db, err := pogreb.Open(path, &pogreb.Options{BackgroundSyncInterval: -1}) +func (s *pogrebKVStore) init() error { + s.logger.Info("(re)opening KVStore", "path", s.path) + db, err := pogreb.Open(s.path, &pogreb.Options{BackgroundSyncInterval: -1}) if err != nil { - return nil, err + s.logger.Error("failed to initialize pogreb store", "err", err) + return err + } + + s.db = db + atomic.StoreUint32(&s.initialized, 1) + s.logger.Info(fmt.Sprintf("KVStore has %d entries", db.Count())) + return nil +} + +func OpenKVStore(logger *log.Logger, path string) (KVStore, error) { + store := &pogrebKVStore{ + logger: logger, + path: path, } - logger.Info(fmt.Sprintf("KVStore has %d entries", db.Count())) - return &pogrebKVStore{DB: db, logger: logger, path: path}, nil + // Open the database in background as it is possible it will do a full-reindex on startup after a crash: + // https://github.com/akrylysov/pogreb/issues/35 + initErrCh := make(chan error) + go func() { + initErrCh <- store.init() + }() + + select { + case err := <-initErrCh: + // Database initialized in time. + if err != nil { + return nil, err + } + return store, nil + case <-time.After(30 * time.Second): + // Database is likely doing a full-reindex after a crash which can take a long time (multiple hours). + // Continue without cache while the database is reindexing in the background. Once it's done, + // the cache will be used. + // NOTE: A failure during the database reindex will be ignored (but logged), therefore in that scenario + // the cache will never be initialized. + logger.Warn("KVStore initialization timed out, continuing without cache while the database is reindexing in the background") + return store, nil + } } // Pretty() returns a pretty-printed, human-readable version of the cache key.