Skip to content

Commit

Permalink
carstore.CarStore is an interface, carstore.FileCarStore implements it
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Sep 20, 2024
1 parent daa3810 commit 5c2cb48
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 47 deletions.
75 changes: 44 additions & 31 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,28 @@ const MaxSliceLength = 2 << 20

const BigShardThreshold = 2 << 20

type CarStore struct {
type CarStore interface {
CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
ReadOnlySession(user models.Uid) (*DeltaSession, error)
ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error
Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
WipeUserData(ctx context.Context, user models.Uid) error
}

type FileCarStore struct {
meta *CarStoreGormMeta
rootDir string

lscLk sync.Mutex
lastShardCache map[models.Uid]*CarShard
}

func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) {
func NewCarStore(meta *gorm.DB, root string) (CarStore, error) {
if _, err := os.Stat(root); err != nil {
if !os.IsNotExist(err) {
return nil, err
Expand All @@ -73,15 +86,15 @@ func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) {
return nil, err
}

return &CarStore{
return &FileCarStore{
meta: &CarStoreGormMeta{meta: meta},
rootDir: root,
lastShardCache: make(map[models.Uid]*CarShard),
}, nil
}

type userView struct {
cs *CarStore
cs *FileCarStore
user models.Uid

cache map[cid.Cid]blockformat.Block
Expand Down Expand Up @@ -256,11 +269,11 @@ type DeltaSession struct {
baseCid cid.Cid
seq int
readonly bool
cs *CarStore
cs *FileCarStore
lastRev string
}

func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard {
func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard {
cs.lscLk.Lock()
defer cs.lscLk.Unlock()

Expand All @@ -272,21 +285,21 @@ func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard {
return nil
}

func (cs *CarStore) removeLastShardCache(user models.Uid) {
func (cs *FileCarStore) removeLastShardCache(user models.Uid) {
cs.lscLk.Lock()
defer cs.lscLk.Unlock()

delete(cs.lastShardCache, user)
}

func (cs *CarStore) putLastShardCache(ls *CarShard) {
func (cs *FileCarStore) putLastShardCache(ls *CarShard) {
cs.lscLk.Lock()
defer cs.lscLk.Unlock()

cs.lastShardCache[ls.Usr] = ls
}

func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard")
defer span.End()

Expand All @@ -306,7 +319,7 @@ func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShar

var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")

func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
defer span.End()

Expand Down Expand Up @@ -338,7 +351,7 @@ func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, since
}, nil
}

func (cs *CarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
return &DeltaSession{
base: &userView{
user: user,
Expand All @@ -353,7 +366,7 @@ func (cs *CarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
}

// TODO: incremental is only ever called true, remove the param
func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
defer span.End()

Expand Down Expand Up @@ -401,7 +414,7 @@ func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev s

// inner loop part of ReadUserCar
// copy shard blocks from disk to Writer
func (cs *CarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error {
func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks")
defer span.End()

Expand All @@ -425,7 +438,7 @@ func (cs *CarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Wri
}

// inner loop part of compactBucket
func (cs *CarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error {
func (cs *FileCarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error {
fi, err := os.Open(sh.Path)
if err != nil {
return err
Expand Down Expand Up @@ -528,7 +541,7 @@ func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error) {
func fnameForShard(user models.Uid, seq int) string {
return fmt.Sprintf("sh-%d-%d", user, seq)
}
func (cs *CarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
// TODO: some overwrite protections
fname := filepath.Join(cs.rootDir, fnameForShard(user, seq))
fi, err := os.Create(fname)
Expand All @@ -539,7 +552,7 @@ func (cs *CarStore) openNewShardFile(ctx context.Context, user models.Uid, seq i
return fi, fname, nil
}

func (cs *CarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) {
func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) {
_, span := otel.Tracer("carstore").Start(ctx, "writeNewShardFile")
defer span.End()

Expand All @@ -552,7 +565,7 @@ func (cs *CarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq
return fname, nil
}

func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error {
func (cs *FileCarStore) deleteShardFile(ctx context.Context, sh *CarShard) error {
return os.Remove(sh.Path)
}

Expand Down Expand Up @@ -587,7 +600,7 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
return hnw, nil
}

func (cs *CarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {

buf := new(bytes.Buffer)
hnw, err := WriteCarHeader(buf, root)
Expand Down Expand Up @@ -646,7 +659,7 @@ func (cs *CarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string,
return buf.Bytes(), nil
}

func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error {
func (cs *FileCarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "putShard")
defer span.End()

Expand Down Expand Up @@ -726,7 +739,7 @@ func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, n
return dropset, nil
}

func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
defer span.End()

Expand Down Expand Up @@ -774,7 +787,7 @@ func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool)
return nil
}

func (cs *CarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
lastShard, err := cs.getLastShard(ctx, user)
if err != nil {
return cid.Undef, err
Expand All @@ -786,7 +799,7 @@ func (cs *CarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.C
return lastShard.Root.CID, nil
}

func (cs *CarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
lastShard, err := cs.getLastShard(ctx, user)
if err != nil {
return "", err
Expand All @@ -804,7 +817,7 @@ type UserStat struct {
Created time.Time
}

func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
shards, err := cs.meta.GetUserShards(ctx, usr)
if err != nil {
return nil, err
Expand All @@ -822,7 +835,7 @@ func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error
return out, nil
}

func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error {
func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error {
shards, err := cs.meta.GetUserShards(ctx, user)
if err != nil {
return err
Expand All @@ -839,7 +852,7 @@ func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error {
return nil
}

func (cs *CarStore) deleteShards(ctx context.Context, shs []CarShard) error {
func (cs *FileCarStore) deleteShards(ctx context.Context, shs []CarShard) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards")
defer span.End()

Expand Down Expand Up @@ -965,7 +978,7 @@ func (cb *compBucket) isEmpty() bool {
return len(cb.shards) == 0
}

func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
// TODO: some overwrite protections
// NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary".
// This creates "sh-%d-%d%s" with some random stuff in the last position
Expand All @@ -982,15 +995,15 @@ type CompactionTarget struct {
NumShards int
}

func (cs *CarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets")
defer span.End()

return cs.meta.GetCompactionTargets(ctx, shardCount)
}

// getBlockRefsForShards is a prep function for CompactUserShards
func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) {
func (cs *FileCarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards")
defer span.End()

Expand Down Expand Up @@ -1028,7 +1041,7 @@ type CompactionStats struct {
DupeCount int `json:"dupeCount"`
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
defer span.End()

Expand Down Expand Up @@ -1242,7 +1255,7 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skip
return stats, nil
}

func (cs *CarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
func (cs *FileCarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs")
defer span.End()

Expand Down Expand Up @@ -1277,7 +1290,7 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs [
return cs.meta.SetStaleRef(ctx, uid, staleToKeep)
}

func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
func (cs *FileCarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket")
defer span.End()

Expand Down
4 changes: 2 additions & 2 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"gorm.io/gorm"
)

func testCarStore() (*CarStore, func(), error) {
func testCarStore() (CarStore, func(), error) {
tempdir, err := os.MkdirTemp("", "msttest-")
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestRepeatedCompactions(t *testing.T) {
checkRepo(t, cs, buf, recs)
}

func checkRepo(t *testing.T, cs *CarStore, r io.Reader, expRecs []cid.Cid) {
func checkRepo(t *testing.T, cs CarStore, r io.Reader, expRecs []cid.Cid) {
t.Helper()
rep, err := repo.ReadRepoFromCar(context.TODO(), r)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions events/dbpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DefaultOptions() *Options {
type DbPersistence struct {
db *gorm.DB

cs *carstore.CarStore
cs carstore.CarStore

lk sync.Mutex

Expand Down Expand Up @@ -86,7 +86,7 @@ type RepoEventRecord struct {
Ops []byte
}

func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore, options *Options) (*DbPersistence, error) {
func NewDbPersistence(db *gorm.DB, cs carstore.CarStore, options *Options) (*DbPersistence, error) {
if err := db.AutoMigrate(&RepoEventRecord{}); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion events/dbpersist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func BenchmarkPlayback(b *testing.B) {
}
}

func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, *carstore.CarStore, string, error) {
func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, error) {
dir, err := os.MkdirTemp("", "integtest")
if err != nil {
return nil, nil, nil, "", err
Expand Down
6 changes: 3 additions & 3 deletions events/diskpersist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func BenchmarkDiskPersist(b *testing.B) {

}

func runPersisterBenchmark(b *testing.B, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
ctx := context.Background()

db.AutoMigrate(&pds.User{})
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestDiskPersister(t *testing.T) {
runEventManagerTest(t, cs, db, dp)
}

func runEventManagerTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
ctx := context.Background()

db.AutoMigrate(&pds.User{})
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestDiskPersisterTakedowns(t *testing.T) {
runTakedownTest(t, cs, db, dp)
}

func runTakedownTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
ctx := context.TODO()

db.AutoMigrate(&pds.User{})
Expand Down
2 changes: 1 addition & 1 deletion pds/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"gorm.io/gorm"
)

func testCarStore(t *testing.T, db *gorm.DB) (*carstore.CarStore, func()) {
func testCarStore(t *testing.T, db *gorm.DB) (carstore.CarStore, func()) {
t.Helper()
tempdir, err := os.MkdirTemp("", "msttest-")
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var log = logging.Logger("pds")

type Server struct {
db *gorm.DB
cs *carstore.CarStore
cs carstore.CarStore
repoman *repomgr.RepoManager
feedgen *FeedGenerator
notifman notifs.NotificationManager
Expand All @@ -65,7 +65,7 @@ type Server struct {
// NewServer.
const serverListenerBootTimeout = 5 * time.Second

func NewServer(db *gorm.DB, cs *carstore.CarStore, serkey *did.PrivKey, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) {
func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) {
db.AutoMigrate(&User{})
db.AutoMigrate(&Peering{})

Expand Down
Loading

0 comments on commit 5c2cb48

Please sign in to comment.