Skip to content

Commit

Permalink
Make the backfiller aware of revs and fill gaps (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Dec 4, 2023
2 parents 18ae508 + 40259f2 commit a857fdc
Show file tree
Hide file tree
Showing 9 changed files with 465 additions and 260 deletions.
146 changes: 138 additions & 8 deletions backfill/backfill.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backfill

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -9,6 +10,7 @@ import (
"sync"
"time"

"github.com/bluesky-social/indigo/api/atproto"
// Blank import to register types for CBORGEN
_ "github.com/bluesky-social/indigo/api/bsky"
lexutil "github.com/bluesky-social/indigo/lex/util"
Expand All @@ -25,13 +27,17 @@ import (
type Job interface {
Repo() string
State() string
Rev() string
SetState(ctx context.Context, state string) error
SetRev(ctx context.Context, rev string) error
RetryCount() int

BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error)
// FlushBufferedOps calls the given callback for each buffered operation
// Once done it clears the buffer and marks the job as "complete"
// Allowing the Job interface to abstract away the details of how buffered
// operations are stored and/or locked
FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error
FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error

ClearBufferedOps(ctx context.Context) error
}
Expand All @@ -40,16 +46,18 @@ type Job interface {
type Store interface {
// BufferOp buffers an operation for a job and returns true if the operation was buffered
// If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete)
BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error)
GetJob(ctx context.Context, repo string) (Job, error)
GetNextEnqueuedJob(ctx context.Context) (Job, error)
UpdateRev(ctx context.Context, repo, rev string) error

EnqueueJob(ctx context.Context, repo string) error
}

// Backfiller is a struct which handles backfilling a repo
type Backfiller struct {
Name string
HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error
HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error
HandleCreateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error
HandleUpdateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error
HandleDeleteRecord func(ctx context.Context, repo string, path string) error
Store Store

Expand Down Expand Up @@ -85,6 +93,9 @@ var ErrJobComplete = errors.New("job is complete")
// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist
var ErrJobNotFound = errors.New("job not found")

// ErrEventGap is returned when an event is received with a since that doesn't match the current rev
var ErrEventGap = fmt.Errorf("buffered event revs did not line up")

var tracer = otel.Tracer("backfiller")

type BackfillOptions struct {
Expand All @@ -109,8 +120,8 @@ func DefaultBackfillOptions() *BackfillOptions {
func NewBackfiller(
name string,
store Store,
handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error,
handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error,
handleCreate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error,
handleUpdate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error,
handleDelete func(ctx context.Context, repo string, path string) error,
opts *BackfillOptions,
) *Backfiller {
Expand Down Expand Up @@ -199,7 +210,7 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int {

// Flush buffered operations, clear the buffer, and mark the job as "complete"
// Clearning and marking are handled by the job interface
err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error {
err := job.FlushBufferedOps(ctx, func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
switch repomgr.EventKind(kind) {
case repomgr.EvtKindCreateRecord:
err := b.HandleCreateRecord(ctx, repo, path, rec, cid)
Expand All @@ -223,6 +234,13 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int {
})
if err != nil {
log.Error("failed to flush buffered ops", "error", err)
if errors.Is(err, ErrEventGap) {
if sserr := job.SetState(ctx, StateEnqueued); sserr != nil {
log.Error("failed to reset job state after failed buffer flush", "error", sserr)
}
// TODO: need to re-queue this job for later
return processed
}
}

// Mark the job as "complete"
Expand Down Expand Up @@ -254,10 +272,17 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
repoDid := job.Repo()

log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid)
if job.RetryCount() > 0 {
log = log.With("retry_count", job.RetryCount())
}
log.Info(fmt.Sprintf("processing backfill for %s", repoDid))

url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid)

if job.Rev() != "" {
url = url + fmt.Sprintf("&since=%s", job.Rev())
}

// GET and CAR decode the body
client := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
Expand Down Expand Up @@ -374,7 +399,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
continue
}

err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &recM, &item.nodeCid)
err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, recM, &item.nodeCid)
if err != nil {
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
continue
Expand Down Expand Up @@ -402,6 +427,10 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
close(recordResults)
resultWG.Wait()

if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil {
log.Error("failed to update rev after backfilling repo", "err", err)
}

// Process buffered operations, marking the job as "complete" when done
numProcessed := b.FlushBuffer(ctx, job)

Expand All @@ -411,3 +440,104 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
"duration", time.Since(start),
)
}

func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
return fmt.Errorf("failed to read event repo: %w", err)
}

var ops []*bufferedOp
for _, op := range evt.Ops {
switch op.Action {
case "create", "update":
cc, rec, err := r.GetRecord(ctx, op.Path)
if err != nil {
return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err)
}

ops = append(ops, &bufferedOp{
kind: op.Action,
path: op.Path,
rec: rec,
cid: &cc,
})
case "delete":
ops = append(ops, &bufferedOp{
kind: op.Action,
path: op.Path,
})
default:
return fmt.Errorf("invalid op action: %q", op.Action)
}
}

buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops)
if err != nil {
return fmt.Errorf("buffer ops failed: %w", err)
}

if buffered {
return nil
}

for _, op := range ops {
switch op.kind {
case "create":
if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil {
return fmt.Errorf("create record failed: %w", err)
}
case "update":
if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil {
return fmt.Errorf("update record failed: %w", err)
}
case "delete":
if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil {
return fmt.Errorf("delete record failed: %w", err)
}
}
}

if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
return fmt.Errorf("failed to update rev: %w", err)
}

return nil
}

func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) {
return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{
path: path,
kind: kind,
rec: rec,
cid: cid,
}})
}

func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) {
j, err := bf.Store.GetJob(ctx, repo)
if err != nil {
if !errors.Is(err, ErrJobNotFound) {
return false, err
}
if qerr := bf.Store.EnqueueJob(ctx, repo); qerr != nil {
return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr)
}

nj, err := bf.Store.GetJob(ctx, repo)
if err != nil {
return false, err
}

j = nj
}

return j.BufferOps(ctx, since, rev, ops)
}

// MaxRetries is the maximum number of times to retry a backfill job
var MaxRetries = 10

func computeExponentialBackoff(attempt int) time.Duration {
return time.Duration(1<<uint(attempt)) * 10 * time.Second
}
35 changes: 20 additions & 15 deletions backfill/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"log/slog"
"sync"
"testing"
"time"

"github.com/bluesky-social/indigo/backfill"
"github.com/ipfs/go-cid"
typegen "github.com/whyrusleeping/cbor-gen"
)
Expand All @@ -20,6 +18,7 @@ type testState struct {
}

func TestBackfill(t *testing.T) {
/* this test depends on being able to hit the live production bgs...
ctx := context.Background()
testRepos := []string{
Expand All @@ -28,7 +27,12 @@ func TestBackfill(t *testing.T) {
"did:plc:t7y4sud4dhptvzz7ibnv5cbt",
}
mem := backfill.NewMemstore()
db, err := gorm.Open(sqlite.Open("sqlite://:memory"))
if err != nil {
t.Fatal(err)
}
store := backfill.NewGormstore(db)
ts := &testState{}
opts := backfill.DefaultBackfillOptions()
Expand All @@ -37,7 +41,7 @@ func TestBackfill(t *testing.T) {
bf := backfill.NewBackfiller(
"backfill-test",
mem,
store,
ts.handleCreate,
ts.handleUpdate,
ts.handleDelete,
Expand All @@ -49,25 +53,25 @@ func TestBackfill(t *testing.T) {
go bf.Start()
for _, repo := range testRepos {
mem.EnqueueJob(repo)
store.EnqueueJob(repo)
}
// Wait until job 0 is in progress
for {
s, err := mem.GetJob(ctx, testRepos[0])
s, err := store.GetJob(ctx, testRepos[0])
if err != nil {
t.Fatal(err)
}
if s.State() == backfill.StateInProgress {
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef)
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef)
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef)
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef)
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef)
mem.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef)
mem.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef)
bf.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef)
break
}
Expand All @@ -87,17 +91,18 @@ func TestBackfill(t *testing.T) {
bf.Stop()
slog.Info("shutting down")
*/
}

func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error {
func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
slog.Info("got create", "repo", repo, "path", path)
ts.lk.Lock()
ts.creates++
ts.lk.Unlock()
return nil
}

func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error {
func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
slog.Info("got update", "repo", repo, "path", path)
ts.lk.Lock()
ts.updates++
Expand Down
Loading

0 comments on commit a857fdc

Please sign in to comment.