diff --git a/backfill/backfill.go b/backfill/backfill.go index a67861dcf..344a1d782 100644 --- a/backfill/backfill.go +++ b/backfill/backfill.go @@ -1,6 +1,7 @@ package backfill import ( + "bytes" "context" "errors" "fmt" @@ -9,6 +10,7 @@ import ( "sync" "time" + "github.com/bluesky-social/indigo/api/atproto" // Blank import to register types for CBORGEN _ "github.com/bluesky-social/indigo/api/bsky" lexutil "github.com/bluesky-social/indigo/lex/util" @@ -25,13 +27,17 @@ import ( type Job interface { Repo() string State() string + Rev() string SetState(ctx context.Context, state string) error + SetRev(ctx context.Context, rev string) error + RetryCount() int + BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) // FlushBufferedOps calls the given callback for each buffered operation // Once done it clears the buffer and marks the job as "complete" // Allowing the Job interface to abstract away the details of how buffered // operations are stored and/or locked - FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error + FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error ClearBufferedOps(ctx context.Context) error } @@ -40,16 +46,18 @@ type Job interface { type Store interface { // BufferOp buffers an operation for a job and returns true if the operation was buffered // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) - BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) GetJob(ctx context.Context, repo string) (Job, error) GetNextEnqueuedJob(ctx context.Context) (Job, error) + UpdateRev(ctx context.Context, repo, rev string) error + + EnqueueJob(ctx context.Context, repo string) error } // Backfiller is a struct which handles backfilling a repo type Backfiller struct { Name string - HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error - HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error + HandleCreateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error + HandleUpdateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error HandleDeleteRecord func(ctx context.Context, repo string, path string) error Store Store @@ -85,6 +93,9 @@ var ErrJobComplete = errors.New("job is complete") // ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist var ErrJobNotFound = errors.New("job not found") +// ErrEventGap is returned when an event is received with a since that doesn't match the current rev +var ErrEventGap = fmt.Errorf("buffered event revs did not line up") + var tracer = otel.Tracer("backfiller") type BackfillOptions struct { @@ -109,8 +120,8 @@ func DefaultBackfillOptions() *BackfillOptions { func NewBackfiller( name string, store Store, - handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, - handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, + handleCreate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, + handleUpdate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, handleDelete func(ctx context.Context, repo string, path string) error, opts *BackfillOptions, ) *Backfiller { @@ -199,7 +210,7 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { // Flush buffered operations, clear the buffer, and mark the job as "complete" // Clearning and marking are handled by the job interface - err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { + err := job.FlushBufferedOps(ctx, func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { switch repomgr.EventKind(kind) { case repomgr.EvtKindCreateRecord: err := b.HandleCreateRecord(ctx, repo, path, rec, cid) @@ -223,6 +234,13 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { }) if err != nil { log.Error("failed to flush buffered ops", "error", err) + if errors.Is(err, ErrEventGap) { + if sserr := job.SetState(ctx, StateEnqueued); sserr != nil { + log.Error("failed to reset job state after failed buffer flush", "error", sserr) + } + // TODO: need to re-queue this job for later + return processed + } } // Mark the job as "complete" @@ -254,10 +272,17 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { repoDid := job.Repo() log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) + if job.RetryCount() > 0 { + log = log.With("retry_count", job.RetryCount()) + } log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) + if job.Rev() != "" { + url = url + fmt.Sprintf("&since=%s", job.Rev()) + } + // GET and CAR decode the body client := &http.Client{ Transport: otelhttp.NewTransport(http.DefaultTransport), @@ -374,7 +399,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { continue } - err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &recM, &item.nodeCid) + err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, recM, &item.nodeCid) if err != nil { recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} continue @@ -402,6 +427,10 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { close(recordResults) resultWG.Wait() + if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil { + log.Error("failed to update rev after backfilling repo", "err", err) + } + // Process buffered operations, marking the job as "complete" when done numProcessed := b.FlushBuffer(ctx, job) @@ -411,3 +440,104 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { "duration", time.Since(start), ) } + +func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) + if err != nil { + return fmt.Errorf("failed to read event repo: %w", err) + } + + var ops []*bufferedOp + for _, op := range evt.Ops { + switch op.Action { + case "create", "update": + cc, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err) + } + + ops = append(ops, &bufferedOp{ + kind: op.Action, + path: op.Path, + rec: rec, + cid: &cc, + }) + case "delete": + ops = append(ops, &bufferedOp{ + kind: op.Action, + path: op.Path, + }) + default: + return fmt.Errorf("invalid op action: %q", op.Action) + } + } + + buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) + if err != nil { + return fmt.Errorf("buffer ops failed: %w", err) + } + + if buffered { + return nil + } + + for _, op := range ops { + switch op.kind { + case "create": + if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { + return fmt.Errorf("create record failed: %w", err) + } + case "update": + if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { + return fmt.Errorf("update record failed: %w", err) + } + case "delete": + if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil { + return fmt.Errorf("delete record failed: %w", err) + } + } + } + + if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { + return fmt.Errorf("failed to update rev: %w", err) + } + + return nil +} + +func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { + return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{ + path: path, + kind: kind, + rec: rec, + cid: cid, + }}) +} + +func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) { + j, err := bf.Store.GetJob(ctx, repo) + if err != nil { + if !errors.Is(err, ErrJobNotFound) { + return false, err + } + if qerr := bf.Store.EnqueueJob(ctx, repo); qerr != nil { + return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr) + } + + nj, err := bf.Store.GetJob(ctx, repo) + if err != nil { + return false, err + } + + j = nj + } + + return j.BufferOps(ctx, since, rev, ops) +} + +// MaxRetries is the maximum number of times to retry a backfill job +var MaxRetries = 10 + +func computeExponentialBackoff(attempt int) time.Duration { + return time.Duration(1< 0 { + first := s.taskQueue[0] + s.taskQueue = s.taskQueue[1:] + + j, err := s.getJob(ctx, first) + if err != nil { + return nil, err + } + + shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter) + + if j.State() == StateEnqueued || shouldRetry { return j, nil } } @@ -183,6 +262,25 @@ func (j *Gormjob) State() string { return j.state } +func (j *Gormjob) SetRev(ctx context.Context, r string) error { + j.lk.Lock() + defer j.lk.Unlock() + + j.rev = r + j.updatedAt = time.Now() + + // Persist the job to the database + j.dbj.Rev = r + return j.db.Save(j.dbj).Error +} + +func (j *Gormjob) Rev() string { + j.lk.Lock() + defer j.lk.Unlock() + + return j.rev +} + func (j *Gormjob) SetState(ctx context.Context, state string) error { j.lk.Lock() defer j.lk.Unlock() @@ -190,24 +288,52 @@ func (j *Gormjob) SetState(ctx context.Context, state string) error { j.state = state j.updatedAt = time.Now() + if strings.HasPrefix(state, "failed") { + if j.retryCount < MaxRetries { + next := time.Now().Add(computeExponentialBackoff(j.retryCount)) + j.retryAfter = &next + j.retryCount++ + } else { + j.retryAfter = nil + } + } + // Persist the job to the database j.dbj.State = state return j.db.Save(j.dbj).Error } -func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { +func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { + // TODO: this will block any events for this repo while this flush is ongoing, is that okay? j.lk.Lock() defer j.lk.Unlock() - for path, ops := range j.bufferedOps { - for _, op := range ops { - if err := fn(op.kind, path, op.rec, op.cid); err != nil { + for _, opset := range j.bufferedOps { + if opset.rev < j.rev { + // stale events, skip + continue + } + + if opset.since == nil { + // TODO: what does this mean? + return fmt.Errorf("nil since in event after backfill: %w", ErrEventGap) + } + + if j.rev != *opset.since { + // we've got a discontinuity + return fmt.Errorf("event since did not match current rev (%s != %s): %w", *opset.since, j.rev, ErrEventGap) + } + + for _, op := range opset.ops { + if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { return err } } + + j.rev = opset.rev } - j.bufferedOps = map[string][]*bufferedOp{} + j.bufferedOps = []*opSet{} j.state = StateComplete return nil @@ -217,7 +343,22 @@ func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { j.lk.Lock() defer j.lk.Unlock() - j.bufferedOps = map[string][]*bufferedOp{} + j.bufferedOps = []*opSet{} j.updatedAt = time.Now() return nil } + +func (j *Gormjob) RetryCount() int { + j.lk.Lock() + defer j.lk.Unlock() + return j.retryCount +} + +func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error { + j, err := s.GetJob(ctx, repo) + if err != nil { + return err + } + + return j.SetRev(ctx, rev) +} diff --git a/backfill/memstore.go b/backfill/memstore.go index 85e552efa..62cfa3c5d 100644 --- a/backfill/memstore.go +++ b/backfill/memstore.go @@ -12,15 +12,23 @@ import ( type bufferedOp struct { kind string - rec *typegen.CBORMarshaler + path string + rec typegen.CBORMarshaler cid *cid.Cid } +type opSet struct { + since *string + rev string + ops []*bufferedOp +} + type Memjob struct { repo string state string + rev string lk sync.Mutex - bufferedOps map[string][]*bufferedOp + bufferedOps []*opSet createdAt time.Time updatedAt time.Time @@ -47,17 +55,16 @@ func (s *Memstore) EnqueueJob(repo string) error { } j := &Memjob{ - repo: repo, - createdAt: time.Now(), - updatedAt: time.Now(), - state: StateEnqueued, - bufferedOps: map[string][]*bufferedOp{}, + repo: repo, + createdAt: time.Now(), + updatedAt: time.Now(), + state: StateEnqueued, } s.jobs[repo] = j return nil } -func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { +func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { s.lk.Lock() // If the job doesn't exist, we can't buffer an op for it @@ -79,10 +86,37 @@ func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *t return false, nil } - j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ - kind: kind, - rec: rec, - cid: cid, + j.bufferedOps = append(j.bufferedOps, &opSet{ + since: since, + rev: rev, + ops: []*bufferedOp{&bufferedOp{ + path: path, + kind: kind, + rec: rec, + cid: cid, + }}, + }) + j.updatedAt = time.Now() + return true, nil +} + +func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) { + j.lk.Lock() + defer j.lk.Unlock() + + switch j.state { + case StateComplete: + return false, ErrJobComplete + case StateInProgress: + // keep going and buffer the op + default: + return false, nil + } + + j.bufferedOps = append(j.bufferedOps, &opSet{ + since: since, + rev: rev, + ops: ops, }) j.updatedAt = time.Now() return true, nil @@ -131,29 +165,47 @@ func (j *Memjob) SetState(ctx context.Context, state string) error { return nil } -func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { - j.lk.Lock() - defer j.lk.Unlock() +func (j *Memjob) Rev() string { + return j.rev +} - for path, ops := range j.bufferedOps { - for _, op := range ops { - if err := fn(op.kind, path, op.rec, op.cid); err != nil { - return err +func (j *Memjob) SetRev(ctx context.Context, rev string) error { + j.rev = rev + return nil +} + +func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { + panic("TODO: copy what we end up doing from the gormstore") + /* + j.lk.Lock() + defer j.lk.Unlock() + + for _, opset := range j.bufferedOps { + for _, op := range opset.ops { + if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { + return err + } } } - } - j.bufferedOps = map[string][]*bufferedOp{} - j.state = StateComplete + j.bufferedOps = map[string][]*bufferedOp{} + j.state = StateComplete - return nil + return nil + */ } func (j *Memjob) ClearBufferedOps(ctx context.Context) error { j.lk.Lock() defer j.lk.Unlock() - j.bufferedOps = map[string][]*bufferedOp{} + j.bufferedOps = []*opSet{} j.updatedAt = time.Now() return nil } + +func (j *Memjob) RetryCount() int { + j.lk.Lock() + defer j.lk.Unlock() + return 0 +} diff --git a/bgs/bgs.go b/bgs/bgs.go index c9eae0c7e..0da2ff501 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -851,7 +851,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event } // skip the fast path for rebases or if the user is already in the slow path - if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) { + if bgs.Index.Crawler.RepoInSlowPath(ctx, u.ID) { rebasesCounter.WithLabelValues(host.Host).Add(1) ai, err := bgs.Index.LookupUser(ctx, u.ID) if err != nil { diff --git a/cmd/gosky/admin.go b/cmd/gosky/admin.go index 969cb012a..54661f69a 100644 --- a/cmd/gosky/admin.go +++ b/cmd/gosky/admin.go @@ -766,6 +766,7 @@ var createInviteCmd = &cli.Command{ } xrpcc.AdminToken = &adminKey + resp, err := comatproto.ServerCreateInviteCodes(context.TODO(), xrpcc, &comatproto.ServerCreateInviteCodes_Input{ UseCount: int64(count), ForAccounts: usrdid, diff --git a/indexer/crawler.go b/indexer/crawler.go index c5ed5e552..3970bfd7d 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -256,7 +256,7 @@ func (c *CrawlDispatcher) AddToCatchupQueue(ctx context.Context, host *models.PD } } -func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, host *models.PDS, uid models.Uid) bool { +func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bool { c.maplk.Lock() defer c.maplk.Unlock() if _, ok := c.todo[uid]; ok { diff --git a/search/firehose.go b/search/firehose.go index dca859bde..730cf0a2a 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -15,9 +15,7 @@ import ( "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/autoscaling" - lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" - "github.com/bluesky-social/indigo/repomgr" "github.com/carlmjohnson/versioninfo" "github.com/gorilla/websocket" @@ -101,59 +99,9 @@ func (s *Server) RunIndexer(ctx context.Context) error { return nil } - // Check if we've backfilled this repo, if not, we should enqueue it - job, err := s.bfs.GetJob(ctx, evt.Repo) - if job == nil && err == nil { - logEvt.Info("enqueueing backfill job for new repo") - if err := s.bfs.EnqueueJob(evt.Repo); err != nil { - logEvt.Warn("failed to enqueue backfill job", "err", err) - } - } - - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) - if err != nil { - // TODO: handle this case (instead of return nil) - logEvt.Error("reading repo from car", "size_bytes", len(evt.Blocks), "err", err) - return nil - } - - for _, op := range evt.Ops { - ek := repomgr.EventKind(op.Action) - logOp := logEvt.With("op_path", op.Path, "op_cid", op.Cid) - switch ek { - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: - rc, rec, err := r.GetRecord(ctx, op.Path) - if err != nil { - // TODO: handle this case (instead of return nil) - logOp.Error("fetching record from event CAR slice", "err", err) - return nil - } - - if lexutil.LexLink(rc) != *op.Cid { - // TODO: handle this case (instead of return nil) - logOp.Error("mismatch in record and op cid", "record_cid", rc) - return nil - } - - if strings.HasPrefix(op.Path, "app.bsky.feed.post") { - postsReceived.Inc() - } else if strings.HasPrefix(op.Path, "app.bsky.actor.profile") { - profilesReceived.Inc() - } - - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { - // TODO: handle this case (instead of return nil) - logOp.Error("failed to handle event op", "err", err) - return nil - } - - case repomgr.EvtKindDeleteRecord: - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { - // TODO: handle this case (instead of return nil) - logOp.Error("failed to handle delete", "err", err) - return nil - } - } + // Pass events to the backfiller which will process or buffer as needed + if err := s.bf.HandleEvent(ctx, evt); err != nil { + logEvt.Error("failed to handle event", "err", err) } return nil @@ -194,8 +142,7 @@ func (s *Server) discoverRepos() { cursor := "" limit := int64(500) - totalEnqueued := 0 - totalSkipped := 0 + total := 0 totalErrored := 0 for { @@ -206,30 +153,17 @@ func (s *Server) discoverRepos() { continue } log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) - enqueued := 0 - skipped := 0 errored := 0 for _, repo := range resp.Repos { - job, err := s.bfs.GetJob(ctx, repo.Did) - if job == nil && err == nil { - log.Info("enqueuing backfill job for new repo", "did", repo.Did) - if err := s.bfs.EnqueueJob(repo.Did); err != nil { - log.Warn("failed to enqueue backfill job", "err", err) - errored++ - continue - } - enqueued++ - } else if err != nil { - log.Warn("failed to get backfill job", "did", repo.Did, "err", err) + _, err := s.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) + if err != nil { + log.Error("failed to get or create job", "did", repo.Did, "err", err) errored++ - } else { - skipped++ } } - log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) - totalEnqueued += enqueued - totalSkipped += skipped + log.Info("enqueued repos", "total", len(resp.Repos), "errored", errored) totalErrored += errored + total += len(resp.Repos) if resp.Cursor != nil && *resp.Cursor != "" { cursor = *resp.Cursor } else { @@ -237,10 +171,10 @@ func (s *Server) discoverRepos() { } } - log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) + log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored) } -func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { +func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { // Since this gets called in a backfill job, we need to check if the path is a post or profile if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { return nil @@ -258,7 +192,6 @@ func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path s if ident == nil { return fmt.Errorf("identity not found for did: %s", did.String()) } - rec := *recP switch rec := rec.(type) { case *bsky.FeedPost: @@ -311,63 +244,6 @@ func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error { return nil } -func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec typegen.CBORMarshaler) error { - var err error - if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { - return nil - } - - if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { - s.logger.Debug("processing create record op", "seq", seq, "did", did, "path", path) - - // Try to buffer the op, if it fails, we need to create a backfill job - _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err == backfill.ErrJobNotFound { - s.logger.Debug("no backfill job found for repo, creating one", "did", did) - - if err := s.bfs.EnqueueJob(did); err != nil { - return fmt.Errorf("enqueueing backfill job: %w", err) - } - - // Try to buffer the op again so it gets picked up by the backfill job - _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err != nil { - return fmt.Errorf("buffering backfill op: %w", err) - } - } else if err == backfill.ErrJobComplete { - // Backfill is done for this repo so we can just index it now - err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid) - } - } else if op == repomgr.EvtKindDeleteRecord { - s.logger.Debug("processing delete record op", "seq", seq, "did", did, "path", path) - - // Try to buffer the op, if it fails, we need to create a backfill job - _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err == backfill.ErrJobNotFound { - s.logger.Debug("no backfill job found for repo, creating one", "did", did) - - if err := s.bfs.EnqueueJob(did); err != nil { - return fmt.Errorf("enqueueing backfill job: %w", err) - } - - // Try to buffer the op again so it gets picked up by the backfill job - _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) - if err != nil { - return fmt.Errorf("buffering backfill op: %w", err) - } - } else if err == backfill.ErrJobComplete { - // Backfill is done for this repo so we can delete imemdiately - err = s.handleDelete(ctx, did, path) - } - } - - if err != nil { - return fmt.Errorf("failed to handle op: %w", err) - } - - return nil -} - func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "") if err != nil { diff --git a/search/handlers.go b/search/handlers.go index a1afd5d40..f15e84bc2 100644 --- a/search/handlers.go +++ b/search/handlers.go @@ -171,7 +171,7 @@ func (s *Server) handleIndexRepos(e echo.Context) error { for _, did := range dids { job, err := s.bfs.GetJob(ctx, did) if job == nil && err == nil { - err := s.bfs.EnqueueJob(did) + err := s.bfs.EnqueueJob(ctx, did) if err != nil { errs = append(errs, IndexError{ DID: did,