From 6f51abb0184fefba9dbbbccfba33c0a88ada0d6d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 27 Nov 2023 09:29:21 -0800 Subject: [PATCH 01/12] WIP: working on making the backfiller rev aware --- backfill/backfill.go | 19 +-- backfill/backfill_test.go | 35 +++--- backfill/gormstore.go | 256 ++++++++++++++++++++++++++++++++------ backfill/memstore.go | 94 ++++++++++---- indexer/crawler.go | 2 +- 5 files changed, 323 insertions(+), 83 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index a67861dcf..859158ebf 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -25,13 +25,16 @@ import ( type Job interface { Repo() string State() string + Rev() string SetState(ctx context.Context, state string) error + SetRev(ctx context.Context, rev string) error + BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) // FlushBufferedOps calls the given callback for each buffered operation // Once done it clears the buffer and marks the job as "complete" // Allowing the Job interface to abstract away the details of how buffered // operations are stored and/or locked - FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error + FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error ClearBufferedOps(ctx context.Context) error } @@ -40,16 +43,16 @@ type Job interface { type Store interface { // BufferOp buffers an operation for a job and returns true if the operation was buffered // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) - BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) GetJob(ctx context.Context, repo string) (Job, error) GetNextEnqueuedJob(ctx context.Context) (Job, error) + UpdateRev(ctx context.Context, repo, rev string) error } // Backfiller is a struct which handles backfilling a repo type Backfiller struct { Name string - HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error - HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error + HandleCreateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error + HandleUpdateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error HandleDeleteRecord func(ctx context.Context, repo string, path string) error Store Store @@ -109,8 +112,8 @@ func DefaultBackfillOptions() *BackfillOptions { func NewBackfiller( name string, store Store, - handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, - handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, + handleCreate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, + handleUpdate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, handleDelete func(ctx context.Context, repo string, path string) error, opts *BackfillOptions, ) *Backfiller { @@ -199,7 +202,7 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { // Flush buffered operations, clear the buffer, and mark the job as "complete" // Clearning and marking are handled by the job interface - err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + err := job.FlushBufferedOps(ctx, func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { switch repomgr.EventKind(kind) { case repomgr.EvtKindCreateRecord: err := b.HandleCreateRecord(ctx, repo, path, rec, cid) @@ -374,7 +377,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { continue } - err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &recM, &item.nodeCid) + err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, recM, &item.nodeCid) if err != nil { recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} continue diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go index 885196e21..576fbdb1a 100644 --- a/backfill/backfill_test.go +++ b/backfill/backfill_test.go @@ -5,9 +5,7 @@ import ( "log/slog" "sync" "testing" - "time" - "github.com/bluesky-social/indigo/backfill" "github.com/ipfs/go-cid" typegen "github.com/whyrusleeping/cbor-gen" ) @@ -20,6 +18,7 @@ type testState struct { } func TestBackfill(t *testing.T) { + /* this test depends on being able to hit the live production bgs... ctx := context.Background() testRepos := []string{ @@ -28,7 +27,12 @@ func TestBackfill(t *testing.T) { "did:plc:t7y4sud4dhptvzz7ibnv5cbt", } - mem := backfill.NewMemstore() + db, err := gorm.Open(sqlite.Open("sqlite://:memory")) + if err != nil { + t.Fatal(err) + } + + store := backfill.NewGormstore(db) ts := &testState{} opts := backfill.DefaultBackfillOptions() @@ -37,7 +41,7 @@ func TestBackfill(t *testing.T) { bf := backfill.NewBackfiller( "backfill-test", - mem, + store, ts.handleCreate, ts.handleUpdate, ts.handleDelete, @@ -49,25 +53,25 @@ func TestBackfill(t *testing.T) { go bf.Start() for _, repo := range testRepos { - mem.EnqueueJob(repo) + store.EnqueueJob(repo) } // Wait until job 0 is in progress for { - s, err := mem.GetJob(ctx, testRepos[0]) + s, err := store.GetJob(ctx, testRepos[0]) if err != nil { t.Fatal(err) } if s.State() == backfill.StateInProgress { - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef) - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef) - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef) - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef) - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef) - mem.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef) - mem.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef) + bf.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef) break } @@ -87,9 +91,10 @@ func TestBackfill(t *testing.T) { bf.Stop() slog.Info("shutting down") + */ } -func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { +func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { slog.Info("got create", "repo", repo, "path", path) ts.lk.Lock() ts.creates++ @@ -97,7 +102,7 @@ func (ts *testState) handleCreate(ctx context.Context, repo string, path string, return nil } -func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { +func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { slog.Info("got update", "repo", repo, "path", path) ts.lk.Lock() ts.updates++ diff --git a/backfill/gormstore.go b/backfill/gormstore.go index f5ec87a71..20e684b8f 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -1,21 +1,26 @@ package backfill import ( + "bytes" "context" "fmt" "sync" "time" + "github.com/bluesky-social/indigo/api/atproto" + "github.com/bluesky-social/indigo/repo" "github.com/ipfs/go-cid" typegen "github.com/whyrusleeping/cbor-gen" "gorm.io/gorm" ) type Gormjob struct { - repo string - state string + repo string + state string + rev string + lk sync.Mutex - bufferedOps map[string][]*bufferedOp + bufferedOps []*opSet dbj *GormDBJob db *gorm.DB @@ -28,6 +33,7 @@ type GormDBJob struct { gorm.Model Repo string `gorm:"unique;index"` State string `gorm:"index"` + Rev string } // Gormstore is a gorm-backed implementation of the Backfill Store interface @@ -65,11 +71,10 @@ func (s *Gormstore) LoadJobs(ctx context.Context) error { for i := range dbjobs { dbj := dbjobs[i] j := &Gormjob{ - repo: dbj.Repo, - state: dbj.State, - bufferedOps: map[string][]*bufferedOp{}, - createdAt: dbj.CreatedAt, - updatedAt: dbj.UpdatedAt, + repo: dbj.Repo, + state: dbj.State, + createdAt: dbj.CreatedAt, + updatedAt: dbj.UpdatedAt, dbj: dbj, db: s.db, @@ -104,11 +109,10 @@ func (s *Gormstore) EnqueueJob(repo string) error { } j := &Gormjob{ - repo: repo, - createdAt: time.Now(), - updatedAt: time.Now(), - state: StateEnqueued, - bufferedOps: map[string][]*bufferedOp{}, + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, dbj: dbj, db: s.db, @@ -118,46 +122,191 @@ func (s *Gormstore) EnqueueJob(repo string) error { return nil } -func (s *Gormstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { - s.lk.RLock() +func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) + if err != nil { + return fmt.Errorf("failed to read event repo: %w", err) + } - // If the job doesn't exist, we can't buffer an op for it - j, ok := s.jobs[repo] - s.lk.RUnlock() - if !ok { - return false, ErrJobNotFound + var ops []*bufferedOp + for _, op := range evt.Ops { + switch op.Action { + case "create", "update": + cc, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return err + } + + ops = append(ops, &bufferedOp{ + kind: op.Action, + path: op.Path, + rec: rec, + cid: &cc, + }) + case "delete": + ops = append(ops, &bufferedOp{ + kind: op.Action, + path: op.Path, + }) + default: + return fmt.Errorf("invalid op action: %q", op.Action) + } + } + + buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) + if err != nil { + return fmt.Errorf("buffer ops failed: %w", err) + } + + if buffered { + return nil } + for _, op := range ops { + switch op.kind { + case "create": + if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { + return fmt.Errorf("create record failed: %w", err) + } + case "update": + if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { + return fmt.Errorf("update record failed: %w", err) + } + case "delete": + if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil { + return fmt.Errorf("delete record failed: %w", err) + } + } + } + + if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { + return fmt.Errorf("failed to update rev: %w", err) + } + + return nil + +} + +func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) { j.lk.Lock() defer j.lk.Unlock() switch j.state { case StateComplete: - return false, ErrJobComplete - case StateInProgress: - // keep going and buffer the op - default: return false, nil + case StateInProgress, StateEnqueued: + // keep going and buffer the op + default: + return false, fmt.Errorf("invalid job state: %q", j.state) } - j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ + j.bufferOps(&opSet{since: since, rev: rev, ops: ops}) + return true, nil +} + +func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{&bufferedOp{ + path: path, kind: kind, rec: rec, cid: cid, - }) + }}) +} + +func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) { + j, err := bf.Store.GetJob(ctx, repo) + if err != nil { + return false, err + } + + return j.BufferOps(ctx, since, rev, ops) +} + +func (j *Gormjob) bufferOps(ops *opSet) { + j.bufferedOps = append(j.bufferedOps, ops) j.updatedAt = time.Now() - return true, nil } func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) { + return s.getJob(ctx, repo) +} + +func (s *Gormstore) getJob(ctx context.Context, repo string) (*Gormjob, error) { + cj := s.checkJobCache(ctx, repo) + if cj != nil { + return cj, nil + } + + return s.loadJob(ctx, repo) +} + +func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) { + var dbj GormDBJob + if err := s.db.Find(&dbj, "repo = ?").Error; err != nil { + return nil, err + } + + if dbj.ID > 0 { + j := &Gormjob{ + repo: dbj.Repo, + state: dbj.State, + createdAt: dbj.CreatedAt, + updatedAt: dbj.UpdatedAt, + + dbj: &dbj, + db: s.db, + } + s.lk.Lock() + defer s.lk.Unlock() + // would imply a race condition + exist, ok := s.jobs[repo] + if ok { + return exist, nil + } + s.jobs[repo] = j + return j, nil + } + + dbj = GormDBJob{ + Repo: repo, + State: StateEnqueued, + } + + if err := s.db.Create(&dbj).Error; err != nil { + // TODO: check for unique-constraint error vs other + jc := s.checkJobCache(ctx, repo) + if jc == nil { + return nil, fmt.Errorf("job cache missing after failed create: %w", err) + } + + return jc, nil + } + + j := &Gormjob{ + repo: dbj.Repo, + state: dbj.State, + createdAt: dbj.CreatedAt, + updatedAt: dbj.UpdatedAt, + + dbj: &dbj, + db: s.db, + } + s.lk.Lock() + defer s.lk.Unlock() + s.jobs[repo] = j + return j, nil + +} + +func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob { s.lk.RLock() defer s.lk.RUnlock() j, ok := s.jobs[repo] if !ok || j == nil { - return nil, nil + return nil } - return j, nil + return j } func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { @@ -183,6 +332,21 @@ func (j *Gormjob) State() string { return j.state } +func (j *Gormjob) SetRev(ctx context.Context, r string) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.rev = r + return nil +} + +func (j *Gormjob) Rev() string { + j.lk.Lock() + defer j.lk.Unlock() + + return j.rev +} + func (j *Gormjob) SetState(ctx context.Context, state string) error { j.lk.Lock() defer j.lk.Unlock() @@ -195,19 +359,37 @@ func (j *Gormjob) SetState(ctx context.Context, state string) error { return j.db.Save(j.dbj).Error } -func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { +func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { + // TODO: this will block any events for this repo while this flush is ongoing, is that okay? j.lk.Lock() defer j.lk.Unlock() - for path, ops := range j.bufferedOps { - for _, op := range ops { - if err := fn(op.kind, path, op.rec, op.cid); err != nil { + for _, opset := range j.bufferedOps { + if opset.rev < j.rev { + // stale events, skip + continue + } + + if opset.since == nil { + // TODO: what does this mean? + panic("TODO") + } + + if j.rev != *opset.since { + // we've got a discontinuity + panic("TODO") + } + + for _, op := range opset.ops { + if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { return err } } + + j.rev = opset.rev } - j.bufferedOps = map[string][]*bufferedOp{} + j.bufferedOps = []*opSet{} j.state = StateComplete return nil @@ -217,7 +399,11 @@ func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { j.lk.Lock() defer j.lk.Unlock() - j.bufferedOps = map[string][]*bufferedOp{} + j.bufferedOps = []*opSet{} j.updatedAt = time.Now() return nil } + +func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error { + panic("NYI") +} diff --git a/backfill/memstore.go b/backfill/memstore.go index 85e552efa..0614cee96 100644 --- a/backfill/memstore.go +++ b/backfill/memstore.go @@ -12,15 +12,23 @@ import ( type bufferedOp struct { kind string - rec *typegen.CBORMarshaler + path string + rec typegen.CBORMarshaler cid *cid.Cid } +type opSet struct { + since *string + rev string + ops []*bufferedOp +} + type Memjob struct { repo string state string + rev string lk sync.Mutex - bufferedOps map[string][]*bufferedOp + bufferedOps []*opSet createdAt time.Time updatedAt time.Time @@ -47,17 +55,16 @@ func (s *Memstore) EnqueueJob(repo string) error { } j := &Memjob{ - repo: repo, - createdAt: time.Now(), - updatedAt: time.Now(), - state: StateEnqueued, - bufferedOps: map[string][]*bufferedOp{}, + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, } s.jobs[repo] = j return nil } -func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { +func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { s.lk.Lock() // If the job doesn't exist, we can't buffer an op for it @@ -79,10 +86,37 @@ func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *t return false, nil } - j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ - kind: kind, - rec: rec, - cid: cid, + j.bufferedOps = append(j.bufferedOps, &opSet{ + since: since, + rev: rev, + ops: []*bufferedOp{&bufferedOp{ + path: path, + kind: kind, + rec: rec, + cid: cid, + }}, + }) + j.updatedAt = time.Now() + return true, nil +} + +func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) { + j.lk.Lock() + defer j.lk.Unlock() + + switch j.state { + case StateComplete: + return false, ErrJobComplete + case StateInProgress: + // keep going and buffer the op + default: + return false, nil + } + + j.bufferedOps = append(j.bufferedOps, &opSet{ + since: since, + rev: rev, + ops: ops, }) j.updatedAt = time.Now() return true, nil @@ -131,29 +165,41 @@ func (j *Memjob) SetState(ctx context.Context, state string) error { return nil } -func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { - j.lk.Lock() - defer j.lk.Unlock() +func (j *Memjob) Rev() string { + return j.rev +} + +func (j *Memjob) SetRev(ctx context.Context, rev string) error { + j.rev = rev + return nil +} - for path, ops := range j.bufferedOps { - for _, op := range ops { - if err := fn(op.kind, path, op.rec, op.cid); err != nil { - return err +func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { + panic("TODO: copy what we end up doing from the gormstore") + /* + j.lk.Lock() + defer j.lk.Unlock() + + for _, opset := range j.bufferedOps { + for _, op := range opset.ops { + if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { + return err + } } } - } - j.bufferedOps = map[string][]*bufferedOp{} - j.state = StateComplete + j.bufferedOps = map[string][]*bufferedOp{} + j.state = StateComplete - return nil + return nil + */ } func (j *Memjob) ClearBufferedOps(ctx context.Context) error { j.lk.Lock() defer j.lk.Unlock() - j.bufferedOps = map[string][]*bufferedOp{} + j.bufferedOps = []*opSet{} j.updatedAt = time.Now() return nil } diff --git a/indexer/crawler.go b/indexer/crawler.go index c5ed5e552..3970bfd7d 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -256,7 +256,7 @@ func (c *CrawlDispatcher) AddToCatchupQueue(ctx context.Context, host *models.PD } } -func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, host *models.PDS, uid models.Uid) bool { +func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bool { c.maplk.Lock() defer c.maplk.Unlock() if _, ok := c.todo[uid]; ok { From 745a735c3c41b4a6f85eb95a26a9434596fee6da Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 27 Nov 2023 11:55:04 -0800 Subject: [PATCH 02/12] get it running --- backfill/backfill.go | 88 ++++++++++++++++++++++++++++++++++++ backfill/gormstore.go | 101 +++++------------------------------------- 2 files changed, 99 insertions(+), 90 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 859158ebf..b48f3076b 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -1,6 +1,7 @@ package backfill import ( + "bytes" "context" "errors" "fmt" @@ -10,6 +11,7 @@ import ( "time" // Blank import to register types for CBORGEN + "github.com/bluesky-social/indigo/api/atproto" _ "github.com/bluesky-social/indigo/api/bsky" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" @@ -405,6 +407,10 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { close(recordResults) resultWG.Wait() + if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil { + log.Error("failed to update rev after backfilling repo", "err", err) + } + // Process buffered operations, marking the job as "complete" when done numProcessed := b.FlushBuffer(ctx, job) @@ -414,3 +420,85 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { "duration", time.Since(start), ) } + +func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) + if err != nil { + return fmt.Errorf("failed to read event repo: %w", err) + } + + var ops []*bufferedOp + for _, op := range evt.Ops { + switch op.Action { + case "create", "update": + cc, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return err + } + + ops = append(ops, &bufferedOp{ + kind: op.Action, + path: op.Path, + rec: rec, + cid: &cc, + }) + case "delete": + ops = append(ops, &bufferedOp{ + kind: op.Action, + path: op.Path, + }) + default: + return fmt.Errorf("invalid op action: %q", op.Action) + } + } + + buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) + if err != nil { + return fmt.Errorf("buffer ops failed: %w", err) + } + + if buffered { + return nil + } + + for _, op := range ops { + switch op.kind { + case "create": + if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { + return fmt.Errorf("create record failed: %w", err) + } + case "update": + if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { + return fmt.Errorf("update record failed: %w", err) + } + case "delete": + if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil { + return fmt.Errorf("delete record failed: %w", err) + } + } + } + + if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { + return fmt.Errorf("failed to update rev: %w", err) + } + + return nil +} + +func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{&bufferedOp{ + path: path, + kind: kind, + rec: rec, + cid: cid, + }}) +} + +func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) { + j, err := bf.Store.GetJob(ctx, repo) + if err != nil { + return false, err + } + + return j.BufferOps(ctx, since, rev, ops) +} diff --git a/backfill/gormstore.go b/backfill/gormstore.go index 20e684b8f..6f43fc69d 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -1,14 +1,11 @@ package backfill import ( - "bytes" "context" "fmt" "sync" "time" - "github.com/bluesky-social/indigo/api/atproto" - "github.com/bluesky-social/indigo/repo" "github.com/ipfs/go-cid" typegen "github.com/whyrusleeping/cbor-gen" "gorm.io/gorm" @@ -122,71 +119,6 @@ func (s *Gormstore) EnqueueJob(repo string) error { return nil } -func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) - if err != nil { - return fmt.Errorf("failed to read event repo: %w", err) - } - - var ops []*bufferedOp - for _, op := range evt.Ops { - switch op.Action { - case "create", "update": - cc, rec, err := r.GetRecord(ctx, op.Path) - if err != nil { - return err - } - - ops = append(ops, &bufferedOp{ - kind: op.Action, - path: op.Path, - rec: rec, - cid: &cc, - }) - case "delete": - ops = append(ops, &bufferedOp{ - kind: op.Action, - path: op.Path, - }) - default: - return fmt.Errorf("invalid op action: %q", op.Action) - } - } - - buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) - if err != nil { - return fmt.Errorf("buffer ops failed: %w", err) - } - - if buffered { - return nil - } - - for _, op := range ops { - switch op.kind { - case "create": - if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { - return fmt.Errorf("create record failed: %w", err) - } - case "update": - if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { - return fmt.Errorf("update record failed: %w", err) - } - case "delete": - if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil { - return fmt.Errorf("delete record failed: %w", err) - } - } - } - - if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { - return fmt.Errorf("failed to update rev: %w", err) - } - - return nil - -} - func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) { j.lk.Lock() defer j.lk.Unlock() @@ -204,24 +136,6 @@ func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops return true, nil } -func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { - return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{&bufferedOp{ - path: path, - kind: kind, - rec: rec, - cid: cid, - }}) -} - -func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) { - j, err := bf.Store.GetJob(ctx, repo) - if err != nil { - return false, err - } - - return j.BufferOps(ctx, since, rev, ops) -} - func (j *Gormjob) bufferOps(ops *opSet) { j.bufferedOps = append(j.bufferedOps, ops) j.updatedAt = time.Now() @@ -242,7 +156,7 @@ func (s *Gormstore) getJob(ctx context.Context, repo string) (*Gormjob, error) { func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) { var dbj GormDBJob - if err := s.db.Find(&dbj, "repo = ?").Error; err != nil { + if err := s.db.Find(&dbj, "repo = ?", repo).Error; err != nil { return nil, err } @@ -359,6 +273,8 @@ func (j *Gormjob) SetState(ctx context.Context, state string) error { return j.db.Save(j.dbj).Error } +var ErrEventGap = fmt.Errorf("buffered event revs did not line up") + func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { // TODO: this will block any events for this repo while this flush is ongoing, is that okay? j.lk.Lock() @@ -372,12 +288,12 @@ func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path strin if opset.since == nil { // TODO: what does this mean? - panic("TODO") + return fmt.Errorf("nil since in event after backfill: %w", ErrEventGap) } if j.rev != *opset.since { // we've got a discontinuity - panic("TODO") + return fmt.Errorf("event since did not match current rev (%s != %s): %w", *opset.since, j.rev, ErrEventGap) } for _, op := range opset.ops { @@ -405,5 +321,10 @@ func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { } func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error { - panic("NYI") + j, err := s.GetJob(ctx, repo) + if err != nil { + return err + } + + return j.SetRev(ctx, rev) } From e4bc8d0e370c1ac040e49f10816af43921750aa8 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 27 Nov 2023 14:24:56 -0800 Subject: [PATCH 03/12] check for event gap error, reset state to enqueued --- backfill/backfill.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/backfill/backfill.go b/backfill/backfill.go index b48f3076b..4b93501ed 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -228,6 +228,13 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { }) if err != nil { log.Error("failed to flush buffered ops", "error", err) + if errors.Is(err, ErrEventGap) { + if sserr := job.SetState(ctx, StateEnqueued); sserr != nil { + log.Error("failed to reset job state after failed buffer flush", "error", sserr) + } + // TODO: need to re-queue this job for later + return processed + } } // Mark the job as "complete" From efe202ed4003334200f51e792e22d7987f08b163 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Tue, 28 Nov 2023 19:09:25 +0000 Subject: [PATCH 04/12] Retry logic, still need to go update search to use the new patterns --- backfill/backfill.go | 16 +++++++++++++++- backfill/gormstore.go | 40 ++++++++++++++++++++++++++++++++++------ backfill/memstore.go | 6 ++++++ bgs/bgs.go | 2 +- 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 4b93501ed..243868670 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -10,8 +10,8 @@ import ( "sync" "time" - // Blank import to register types for CBORGEN "github.com/bluesky-social/indigo/api/atproto" + // Blank import to register types for CBORGEN _ "github.com/bluesky-social/indigo/api/bsky" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" @@ -30,6 +30,7 @@ type Job interface { Rev() string SetState(ctx context.Context, state string) error SetRev(ctx context.Context, rev string) error + RetryCount() int BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) // FlushBufferedOps calls the given callback for each buffered operation @@ -90,6 +91,9 @@ var ErrJobComplete = errors.New("job is complete") // ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist var ErrJobNotFound = errors.New("job not found") +// ErrEventGap is returned when an event is received with a since that doesn't match the current rev +var ErrEventGap = fmt.Errorf("buffered event revs did not line up") + var tracer = otel.Tracer("backfiller") type BackfillOptions struct { @@ -266,6 +270,9 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { repoDid := job.Repo() log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) + if job.RetryCount() > 0 { + log = log.With("retry_count", job.RetryCount()) + } log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) @@ -509,3 +516,10 @@ func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, return j.BufferOps(ctx, since, rev, ops) } + +// MaxRetries is the maximum number of times to retry a backfill job +var MaxRetries = 10 + +func computeExponentialBackoff(attempt int) time.Duration { + return time.Duration(1< Date: Wed, 29 Nov 2023 23:22:14 +0000 Subject: [PATCH 05/12] Update search to use new backfill model maybe --- backfill/backfill.go | 2 +- search/firehose.go | 119 ++----------------------------------------- 2 files changed, 5 insertions(+), 116 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 243868670..2c2a65728 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -500,7 +500,7 @@ func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscrib } func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { - return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{&bufferedOp{ + return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{ path: path, kind: kind, rec: rec, diff --git a/search/firehose.go b/search/firehose.go index dca859bde..795c86cac 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -12,12 +12,9 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" - "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/autoscaling" - lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" - "github.com/bluesky-social/indigo/repomgr" "github.com/carlmjohnson/versioninfo" "github.com/gorilla/websocket" @@ -101,59 +98,9 @@ func (s *Server) RunIndexer(ctx context.Context) error { return nil } - // Check if we've backfilled this repo, if not, we should enqueue it - job, err := s.bfs.GetJob(ctx, evt.Repo) - if job == nil && err == nil { - logEvt.Info("enqueueing backfill job for new repo") - if err := s.bfs.EnqueueJob(evt.Repo); err != nil { - logEvt.Warn("failed to enqueue backfill job", "err", err) - } - } - - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) - if err != nil { - // TODO: handle this case (instead of return nil) - logEvt.Error("reading repo from car", "size_bytes", len(evt.Blocks), "err", err) - return nil - } - - for _, op := range evt.Ops { - ek := repomgr.EventKind(op.Action) - logOp := logEvt.With("op_path", op.Path, "op_cid", op.Cid) - switch ek { - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: - rc, rec, err := r.GetRecord(ctx, op.Path) - if err != nil { - // TODO: handle this case (instead of return nil) - logOp.Error("fetching record from event CAR slice", "err", err) - return nil - } - - if lexutil.LexLink(rc) != *op.Cid { - // TODO: handle this case (instead of return nil) - logOp.Error("mismatch in record and op cid", "record_cid", rc) - return nil - } - - if strings.HasPrefix(op.Path, "app.bsky.feed.post") { - postsReceived.Inc() - } else if strings.HasPrefix(op.Path, "app.bsky.actor.profile") { - profilesReceived.Inc() - } - - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { - // TODO: handle this case (instead of return nil) - logOp.Error("failed to handle event op", "err", err) - return nil - } - - case repomgr.EvtKindDeleteRecord: - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { - // TODO: handle this case (instead of return nil) - logOp.Error("failed to handle delete", "err", err) - return nil - } - } + // Pass events to the backfiller which will process or buffer as needed + if err := s.bf.HandleEvent(ctx, evt); err != nil { + logEvt.Error("failed to handle event", "err", err) } return nil @@ -240,7 +187,7 @@ func (s *Server) discoverRepos() { log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) } -func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { +func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { // Since this gets called in a backfill job, we need to check if the path is a post or profile if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { return nil @@ -258,7 +205,6 @@ func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path s if ident == nil { return fmt.Errorf("identity not found for did: %s", did.String()) } - rec := *recP switch rec := rec.(type) { case *bsky.FeedPost: @@ -311,63 +257,6 @@ func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error { return nil } -func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec typegen.CBORMarshaler) error { - var err error - if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { - return nil - } - - if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { - s.logger.Debug("processing create record op", "seq", seq, "did", did, "path", path) - - // Try to buffer the op, if it fails, we need to create a backfill job - _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err == backfill.ErrJobNotFound { - s.logger.Debug("no backfill job found for repo, creating one", "did", did) - - if err := s.bfs.EnqueueJob(did); err != nil { - return fmt.Errorf("enqueueing backfill job: %w", err) - } - - // Try to buffer the op again so it gets picked up by the backfill job - _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err != nil { - return fmt.Errorf("buffering backfill op: %w", err) - } - } else if err == backfill.ErrJobComplete { - // Backfill is done for this repo so we can just index it now - err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid) - } - } else if op == repomgr.EvtKindDeleteRecord { - s.logger.Debug("processing delete record op", "seq", seq, "did", did, "path", path) - - // Try to buffer the op, if it fails, we need to create a backfill job - _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err == backfill.ErrJobNotFound { - s.logger.Debug("no backfill job found for repo, creating one", "did", did) - - if err := s.bfs.EnqueueJob(did); err != nil { - return fmt.Errorf("enqueueing backfill job: %w", err) - } - - // Try to buffer the op again so it gets picked up by the backfill job - _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err != nil { - return fmt.Errorf("buffering backfill op: %w", err) - } - } else if err == backfill.ErrJobComplete { - // Backfill is done for this repo so we can delete imemdiately - err = s.handleDelete(ctx, did, path) - } - } - - if err != nil { - return fmt.Errorf("failed to handle op: %w", err) - } - - return nil -} - func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "") if err != nil { From b1a3705fda6d0cfaca314fb3c7feb5a01bb45ba1 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 30 Nov 2023 15:10:29 -0800 Subject: [PATCH 06/12] dont create job on failed load --- backfill/backfill.go | 16 ++++++++++++- backfill/gormstore.go | 55 +++++++++++++------------------------------ cmd/gosky/admin.go | 1 + 3 files changed, 32 insertions(+), 40 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 2c2a65728..b3a0918a0 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -49,6 +49,8 @@ type Store interface { GetJob(ctx context.Context, repo string) (Job, error) GetNextEnqueuedJob(ctx context.Context) (Job, error) UpdateRev(ctx context.Context, repo, rev string) error + + EnqueueJob(repo string) error } // Backfiller is a struct which handles backfilling a repo @@ -511,7 +513,19 @@ func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) { j, err := bf.Store.GetJob(ctx, repo) if err != nil { - return false, err + if !errors.Is(err, ErrJobNotFound) { + return false, err + } + if qerr := bf.Store.EnqueueJob(repo); qerr != nil { + return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr) + } + + nj, err := bf.Store.GetJob(ctx, repo) + if err != nil { + return false, err + } + + j = nj } return j.BufferOps(ctx, since, rev, ops) diff --git a/backfill/gormstore.go b/backfill/gormstore.go index 81306be12..0294930be 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -54,6 +54,7 @@ func NewGormstore(db *gorm.DB) *Gormstore { } func (s *Gormstore) LoadJobs(ctx context.Context) error { + // TODO: get rid of this method, and just load on demand in GetNextEnqueuedJob limit := 20_000 offset := 0 s.lk.Lock() @@ -169,43 +170,8 @@ func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) return nil, err } - if dbj.ID > 0 { - j := &Gormjob{ - repo: dbj.Repo, - state: dbj.State, - createdAt: dbj.CreatedAt, - updatedAt: dbj.UpdatedAt, - - dbj: &dbj, - db: s.db, - - retryCount: dbj.RetryCount, - retryAfter: dbj.RetryAfter, - } - s.lk.Lock() - defer s.lk.Unlock() - // would imply a race condition - exist, ok := s.jobs[repo] - if ok { - return exist, nil - } - s.jobs[repo] = j - return j, nil - } - - dbj = GormDBJob{ - Repo: repo, - State: StateEnqueued, - } - - if err := s.db.Create(&dbj).Error; err != nil { - // TODO: check for unique-constraint error vs other - jc := s.checkJobCache(ctx, repo) - if jc == nil { - return nil, fmt.Errorf("job cache missing after failed create: %w", err) - } - - return jc, nil + if dbj.ID == 0 { + return nil, ErrJobNotFound } j := &Gormjob{ @@ -216,12 +182,19 @@ func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) dbj: &dbj, db: s.db, + + retryCount: dbj.RetryCount, + retryAfter: dbj.RetryAfter, } s.lk.Lock() defer s.lk.Unlock() + // would imply a race condition + exist, ok := s.jobs[repo] + if ok { + return exist, nil + } s.jobs[repo] = j return j, nil - } func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob { @@ -265,7 +238,11 @@ func (j *Gormjob) SetRev(ctx context.Context, r string) error { defer j.lk.Unlock() j.rev = r - return nil + j.updatedAt = time.Now() + + // Persist the job to the database + j.dbj.Rev = r + return j.db.Save(j.dbj).Error } func (j *Gormjob) Rev() string { diff --git a/cmd/gosky/admin.go b/cmd/gosky/admin.go index 969cb012a..54661f69a 100644 --- a/cmd/gosky/admin.go +++ b/cmd/gosky/admin.go @@ -766,6 +766,7 @@ var createInviteCmd = &cli.Command{ } xrpcc.AdminToken = &adminKey + resp, err := comatproto.ServerCreateInviteCodes(context.TODO(), xrpcc, &comatproto.ServerCreateInviteCodes_Input{ UseCount: int64(count), ForAccounts: usrdid, From be7d6521a3c112ed865ea4ad66f0887199f29fdd Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 1 Dec 2023 14:27:30 -0800 Subject: [PATCH 07/12] load jobs from database on demand --- backfill/backfill.go | 4 +- backfill/gormstore.go | 109 ++++++++++++++++++++++++++---------------- 2 files changed, 71 insertions(+), 42 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index b3a0918a0..3632880ea 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -50,7 +50,7 @@ type Store interface { GetNextEnqueuedJob(ctx context.Context) (Job, error) UpdateRev(ctx context.Context, repo, rev string) error - EnqueueJob(repo string) error + EnqueueJob(ctx context.Context, repo string) error } // Backfiller is a struct which handles backfilling a repo @@ -516,7 +516,7 @@ func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, if !errors.Is(err, ErrJobNotFound) { return false, err } - if qerr := bf.Store.EnqueueJob(repo); qerr != nil { + if qerr := bf.Store.EnqueueJob(ctx, repo); qerr != nil { return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr) } diff --git a/backfill/gormstore.go b/backfill/gormstore.go index 0294930be..ad3f5a81e 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -2,6 +2,7 @@ package backfill import ( "context" + "errors" "fmt" "strings" "sync" @@ -43,7 +44,11 @@ type GormDBJob struct { type Gormstore struct { lk sync.RWMutex jobs map[string]*Gormjob - db *gorm.DB + + qlk sync.Mutex + taskQueue []string + + db *gorm.DB } func NewGormstore(db *gorm.DB) *Gormstore { @@ -54,47 +59,54 @@ func NewGormstore(db *gorm.DB) *Gormstore { } func (s *Gormstore) LoadJobs(ctx context.Context) error { - // TODO: get rid of this method, and just load on demand in GetNextEnqueuedJob - limit := 20_000 - offset := 0 - s.lk.Lock() - defer s.lk.Unlock() + s.qlk.Lock() + defer s.qlk.Unlock() + return s.loadJobs(ctx, 20_000) +} - for { - var dbjobs []*GormDBJob - // Load all jobs from the database - if err := s.db.Limit(limit).Offset(offset).Find(&dbjobs).Error; err != nil { - return err - } - if len(dbjobs) == 0 { - break - } - offset += len(dbjobs) - - // Convert them to in-memory jobs - for i := range dbjobs { - dbj := dbjobs[i] - j := &Gormjob{ - repo: dbj.Repo, - state: dbj.State, - createdAt: dbj.CreatedAt, - updatedAt: dbj.UpdatedAt, - - dbj: dbj, - db: s.db, - - retryCount: dbj.RetryCount, - retryAfter: dbj.RetryAfter, - } - s.jobs[dbj.Repo] = j - } +func (s *Gormstore) loadJobs(ctx context.Context, limit int) error { + var todo []string + if err := s.db.Model(GormDBJob{}).Limit(limit).Select("repo"). + Where("state = 'enqueued' OR (state = 'failed' AND (retry_after = NULL OR retry_after < NOW()))").Scan(&todo).Error; err != nil { + return err } + s.taskQueue = append(s.taskQueue, todo...) + return nil } -func (s *Gormstore) EnqueueJob(repo string) error { - // Persist the job to the database +func (s *Gormstore) GetOrCreateJob(ctx context.Context, repo, state string) (Job, error) { + j, err := s.getJob(ctx, repo) + if err == nil { + return j, nil + } + + if !errors.Is(err, ErrJobNotFound) { + return nil, err + } + + if err := s.createJobForRepo(repo, state); err != nil { + return nil, err + } + + return s.getJob(ctx, repo) +} + +func (s *Gormstore) EnqueueJob(ctx context.Context, repo string) error { + _, err := s.GetOrCreateJob(ctx, repo, StateEnqueued) + if err != nil { + return err + } + + s.qlk.Lock() + s.taskQueue = append(s.taskQueue, repo) + s.qlk.Unlock() + + return nil +} + +func (s *Gormstore) createJobForRepo(repo, state string) error { dbj := &GormDBJob{ Repo: repo, State: StateEnqueued, @@ -119,7 +131,7 @@ func (s *Gormstore) EnqueueJob(repo string) error { repo: repo, createdAt: time.Now(), updatedAt: time.Now(), - state: StateEnqueued, + state: state, dbj: dbj, db: s.db, @@ -209,10 +221,27 @@ func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob { } func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { - s.lk.RLock() - defer s.lk.RUnlock() + s.qlk.Lock() + defer s.qlk.Unlock() + if len(s.taskQueue) == 0 { + if err := s.loadJobs(ctx, 1000); err != nil { + return nil, err + } + + if len(s.taskQueue) == 0 { + return nil, nil + } + } + + for len(s.taskQueue) > 0 { + first := s.taskQueue[0] + s.taskQueue = s.taskQueue[1:] + + j, err := s.getJob(ctx, first) + if err != nil { + return nil, err + } - for _, j := range s.jobs { shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter) if j.State() == StateEnqueued || shouldRetry { From f752ca1212a554fb4821271c4c3f4c2babbd230d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 1 Dec 2023 14:30:23 -0800 Subject: [PATCH 08/12] if we have an existing rev state on a job, fetch with since param --- backfill/backfill.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backfill/backfill.go b/backfill/backfill.go index 3632880ea..8951bfd55 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -279,6 +279,10 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) + if job.Rev() != "" { + url = url + fmt.Sprintf("&since=%s", job.Rev()) + } + // GET and CAR decode the body client := &http.Client{ Transport: otelhttp.NewTransport(http.DefaultTransport), From 72ca3df6a2a8d3e7388d2395d4108c754f65d3a4 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 1 Dec 2023 15:39:26 -0800 Subject: [PATCH 09/12] fix build --- backfill/backfill.go | 2 +- backfill/gormstore.go | 2 +- search/firehose.go | 2 +- search/handlers.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backfill/backfill.go b/backfill/backfill.go index 8951bfd55..344a1d782 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -453,7 +453,7 @@ func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscrib case "create", "update": cc, rec, err := r.GetRecord(ctx, op.Path) if err != nil { - return err + return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err) } ops = append(ops, &bufferedOp{ diff --git a/backfill/gormstore.go b/backfill/gormstore.go index ad3f5a81e..02d5eb54f 100644 --- a/backfill/gormstore.go +++ b/backfill/gormstore.go @@ -67,7 +67,7 @@ func (s *Gormstore) LoadJobs(ctx context.Context) error { func (s *Gormstore) loadJobs(ctx context.Context, limit int) error { var todo []string if err := s.db.Model(GormDBJob{}).Limit(limit).Select("repo"). - Where("state = 'enqueued' OR (state = 'failed' AND (retry_after = NULL OR retry_after < NOW()))").Scan(&todo).Error; err != nil { + Where("state = 'enqueued' OR (state = 'failed' AND (retry_after = NULL OR retry_after < ?))", time.Now()).Scan(&todo).Error; err != nil { return err } diff --git a/search/firehose.go b/search/firehose.go index 795c86cac..6cc962019 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -160,7 +160,7 @@ func (s *Server) discoverRepos() { job, err := s.bfs.GetJob(ctx, repo.Did) if job == nil && err == nil { log.Info("enqueuing backfill job for new repo", "did", repo.Did) - if err := s.bfs.EnqueueJob(repo.Did); err != nil { + if err := s.bfs.EnqueueJob(ctx, repo.Did); err != nil { log.Warn("failed to enqueue backfill job", "err", err) errored++ continue diff --git a/search/handlers.go b/search/handlers.go index a1afd5d40..f15e84bc2 100644 --- a/search/handlers.go +++ b/search/handlers.go @@ -171,7 +171,7 @@ func (s *Server) handleIndexRepos(e echo.Context) error { for _, did := range dids { job, err := s.bfs.GetJob(ctx, did) if job == nil && err == nil { - err := s.bfs.EnqueueJob(did) + err := s.bfs.EnqueueJob(ctx, did) if err != nil { errs = append(errs, IndexError{ DID: did, From 6b135a65c2760d4b8eca6592d8f07d40e4aa7231 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 1 Dec 2023 23:57:50 +0000 Subject: [PATCH 10/12] Check for JobNotFound errors --- search/firehose.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/search/firehose.go b/search/firehose.go index 6cc962019..1c77c5cdc 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -3,6 +3,7 @@ package search import ( "bytes" "context" + "errors" "fmt" "net/http" "net/url" @@ -12,6 +13,7 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/autoscaling" "github.com/bluesky-social/indigo/repo" @@ -158,7 +160,7 @@ func (s *Server) discoverRepos() { errored := 0 for _, repo := range resp.Repos { job, err := s.bfs.GetJob(ctx, repo.Did) - if job == nil && err == nil { + if job == nil && (err == nil || errors.Is(err, backfill.ErrJobNotFound)) { log.Info("enqueuing backfill job for new repo", "did", repo.Did) if err := s.bfs.EnqueueJob(ctx, repo.Did); err != nil { log.Warn("failed to enqueue backfill job", "err", err) From 27d610c416d0b9bba4f420a5b4a2aa3ec9829699 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Sat, 2 Dec 2023 00:06:20 +0000 Subject: [PATCH 11/12] Use GetOrCreateJob --- search/firehose.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/search/firehose.go b/search/firehose.go index 1c77c5cdc..e87a8d24c 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -3,7 +3,6 @@ package search import ( "bytes" "context" - "errors" "fmt" "net/http" "net/url" @@ -143,8 +142,7 @@ func (s *Server) discoverRepos() { cursor := "" limit := int64(500) - totalEnqueued := 0 - totalSkipped := 0 + total := 0 totalErrored := 0 for { @@ -159,26 +157,15 @@ func (s *Server) discoverRepos() { skipped := 0 errored := 0 for _, repo := range resp.Repos { - job, err := s.bfs.GetJob(ctx, repo.Did) - if job == nil && (err == nil || errors.Is(err, backfill.ErrJobNotFound)) { - log.Info("enqueuing backfill job for new repo", "did", repo.Did) - if err := s.bfs.EnqueueJob(ctx, repo.Did); err != nil { - log.Warn("failed to enqueue backfill job", "err", err) - errored++ - continue - } - enqueued++ - } else if err != nil { - log.Warn("failed to get backfill job", "did", repo.Did, "err", err) + _, err := s.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) + if err != nil { + log.Error("failed to get or create job", "did", repo.Did, "err", err) errored++ - } else { - skipped++ } } log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) - totalEnqueued += enqueued - totalSkipped += skipped totalErrored += errored + total += len(resp.Repos) if resp.Cursor != nil && *resp.Cursor != "" { cursor = *resp.Cursor } else { @@ -186,7 +173,7 @@ func (s *Server) discoverRepos() { } } - log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) + log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored) } func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { From 40259f281aa235909b846a5efffc25d1794a9336 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Sat, 2 Dec 2023 00:06:59 +0000 Subject: [PATCH 12/12] Cleanup --- search/firehose.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/search/firehose.go b/search/firehose.go index e87a8d24c..730cf0a2a 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -153,8 +153,6 @@ func (s *Server) discoverRepos() { continue } log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) - enqueued := 0 - skipped := 0 errored := 0 for _, repo := range resp.Repos { _, err := s.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) @@ -163,7 +161,7 @@ func (s *Server) discoverRepos() { errored++ } } - log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) + log.Info("enqueued repos", "total", len(resp.Repos), "errored", errored) totalErrored += errored total += len(resp.Repos) if resp.Cursor != nil && *resp.Cursor != "" {