Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flag to skip big shards during compaction #380

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,17 @@ func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
return fmt.Errorf("must pass a did")
}

var fast bool
if strings.ToLower(e.QueryParam("fast")) == "true" {
fast = true
}

u, err := bgs.lookupUserByDid(ctx, did)
if err != nil {
return fmt.Errorf("no such user: %w", err)
}

stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID)
stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID, fast)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
Expand All @@ -431,6 +436,11 @@ func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
dry = true
}

var fast bool
if strings.ToLower(e.QueryParam("fast")) == "true" {
fast = true
}

lim := 50
if limstr := e.QueryParam("limit"); limstr != "" {
v, err := strconv.Atoi(limstr)
Expand All @@ -441,7 +451,7 @@ func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
lim = v
}

stats, err := bgs.runRepoCompaction(ctx, lim, dry)
stats, err := bgs.runRepoCompaction(ctx, lim, dry, fast)
if err != nil {
return fmt.Errorf("compaction run failed: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ type compactionStats struct {
Targets []carstore.CompactionTarget
}

func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*compactionStats, error) {
func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool, fast bool) (*compactionStats, error) {
ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction")
defer span.End()

Expand Down Expand Up @@ -1242,7 +1242,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*comp
}

repostart := time.Now()
st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr)
st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr, fast)
if err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
Expand Down
43 changes: 42 additions & 1 deletion carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var log = logging.Logger("carstore")

const MaxSliceLength = 2 << 20

const BigShardThreshold = 2 << 20

type CarStore struct {
meta *gorm.DB
rootDir string
Expand Down Expand Up @@ -1207,6 +1209,15 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint)
return out, nil
}

func shardSize(sh *CarShard) (int64, error) {
st, err := os.Stat(sh.Path)
if err != nil {
return 0, fmt.Errorf("stat %q: %w", sh.Path, err)
}

return st.Size(), nil
}

type CompactionStats struct {
TotalRefs int `json:"totalRefs"`
StartShards int `json:"startShards"`
Expand All @@ -1217,7 +1228,7 @@ type CompactionStats struct {
DupeCount int `json:"dupeCount"`
}

func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) {
func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
defer span.End()

Expand All @@ -1228,6 +1239,36 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*Co
return nil, err
}

sort.Slice(shards, func(i, j int) bool {
return shards[i].Seq < shards[j].Seq
})

if skipBigShards {
// Since we generally expect shards to start bigger and get smaller,
// and because we want to avoid compacting non-adjacent shards
// together, and because we want to avoid running a stat on every
// single shard (can be expensive for repos that havent been compacted
// in a while) we only skip a prefix of shard files that are over the
// threshold. this may end up not skipping some shards that are over
// the threshold if a below-threshold shard occurs before them, but
// since this is a heuristic and imperfect optimization, that is
// acceptable.
var skip int
for i, sh := range shards {
size, err := shardSize(&sh)
if err != nil {
return nil, fmt.Errorf("could not check size of shard file: %w", err)
}

if size > BigShardThreshold {
skip = i + 1
} else {
break
}
}
shards = shards[skip:]
}

span.SetAttributes(attribute.Int("shards", len(shards)))

var shardIds []uint
Expand Down
4 changes: 2 additions & 2 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestBasicOperation(t *testing.T) {
}
checkRepo(t, cs, buf, recs)

if _, err := cs.CompactUserShards(ctx, 1); err != nil {
if _, err := cs.CompactUserShards(ctx, 1, false); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func TestRepeatedCompactions(t *testing.T) {
head = nroot
}
fmt.Println("Run compaction", loop)
st, err := cs.CompactUserShards(ctx, 1)
st, err := cs.CompactUserShards(ctx, 1, false)
if err != nil {
t.Fatal(err)
}
Expand Down
33 changes: 28 additions & 5 deletions cmd/gosky/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,28 @@ var bgsSetNewSubsEnabledCmd = &cli.Command{

var bgsCompactRepo = &cli.Command{
Name: "compact-repo",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "fast",
},
},
Action: func(cctx *cli.Context) error {
url := cctx.String("bgs") + "/admin/repo/compact"
uu, err := url.Parse(cctx.String("bgs") + "/admin/repo/compact")
if err != nil {
return err
}

q := uu.Query()
did := cctx.Args().First()
q.Add("did", did)

url += fmt.Sprintf("?did=%s", did)
if cctx.Bool("fast") {
q.Add("fast", "true")
}

req, err := http.NewRequest("POST", url, nil)
uu.RawQuery = q.Encode()

req, err := http.NewRequest("POST", uu.String(), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -354,20 +368,29 @@ var bgsCompactAll = &cli.Command{
&cli.IntFlag{
Name: "limit",
},
&cli.BoolFlag{
Name: "fast",
},
},
Action: func(cctx *cli.Context) error {
uu, err := url.Parse(cctx.String("bgs") + "/admin/repo/compactAll")
if err != nil {
return err
}

q := uu.Query()
if cctx.Bool("dry") {
uu.Query().Add("dry", "true")
q.Add("dry", "true")
}

if cctx.Bool("fast") {
q.Add("fast", "true")
}

if cctx.IsSet("limit") {
uu.Query().Add("limit", fmt.Sprint(cctx.Int("limit")))
q.Add("limit", fmt.Sprint(cctx.Int("limit")))
}
uu.RawQuery = q.Encode()

req, err := http.NewRequest("POST", uu.String(), nil)
if err != nil {
Expand Down
Loading