From 41eddefedb2c224d7d18bcf3e8637899de2c4a63 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Mon, 30 Dec 2024 14:43:49 -0500 Subject: [PATCH 1/2] wip: start rainbow listRepos proxy --- splitter/splitter.go | 108 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 3 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index 893414194..ef54ff4da 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -6,6 +6,8 @@ import ( "encoding/json" "errors" "fmt" + "go.opentelemetry.io/otel" + "gorm.io/gorm" "io" "log/slog" "math/rand" @@ -51,13 +53,27 @@ type Splitter struct { nextCrawlers []*url.URL } +const DefaultListReposCacheDuration = time.Minute * 5 + type SplitterConfig struct { UpstreamHost string CursorFile string PebbleOptions *events.PebblePersistOptions + + ListReposCacheDuration time.Duration +} + +func (sc *SplitterConfig) normalize() error { + if sc.ListReposCacheDuration == 0 { + sc.ListReposCacheDuration = DefaultListReposCacheDuration + } + return nil } func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { + if err := conf.normalize(); err != nil { + return nil, err + } var nextCrawlerURLs []*url.URL log := slog.Default().With("system", "splitter") if len(nextCrawlers) > 0 { @@ -105,9 +121,10 @@ func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (* MaxBytes: uint64(maxBytes), } conf := SplitterConfig{ - UpstreamHost: host, - CursorFile: "cursor-file", - PebbleOptions: &ppopts, + UpstreamHost: host, + CursorFile: "cursor-file", + PebbleOptions: &ppopts, + ListReposCacheDuration: DefaultListReposCacheDuration, } pp, err := events.NewPebblePersistance(&ppopts) if err != nil { @@ -207,6 +224,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error { e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) + e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) e.GET("/xrpc/_health", s.HandleHealthCheck) e.GET("/_health", s.HandleHealthCheck) @@ -329,6 +347,90 @@ func (s *Splitter) RequestCrawlHandler(c echo.Context) error { return c.JSON(200, HealthStatus{Status: "ok"}) } +func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { + // TODO: identical to bgs/stubs.go - re-unify? + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") + defer span.End() + + cursorQuery := c.QueryParam("cursor") + limitQuery := c.QueryParam("limit") + + var err error + + limit := 500 + if limitQuery != "" { + limit, err = strconv.Atoi(limitQuery) + if err != nil || limit < 1 || limit > 1000 { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)}) + } + } + + cursor := int64(0) + if cursorQuery != "" { + cursor, err = strconv.ParseInt(cursorQuery, 10, 64) + if err != nil || cursor < 0 { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cursor: %s", cursorQuery)}) + } + } + + out, handleErr := s.handleComAtprotoSyncListRepos(ctx, cursor, limit) + if handleErr != nil { + return handleErr + } + return c.JSON(200, out) +} + +func (s *Splitter) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, limit int) (*comatproto.SyncListRepos_Output, error) { + // Filter out tombstoned, taken down, and deactivated accounts + q := fmt.Sprintf("id > ? AND NOT tombstoned AND NOT taken_down AND (upstream_status is NULL OR (upstream_status != '%s' AND upstream_status != '%s' AND upstream_status != '%s'))", + events.AccountStatusDeactivated, events.AccountStatusSuspended, events.AccountStatusTakendown) + + // Load the users + users := []*User{} + if err := s.db.Model(&User{}).Where(q, cursor).Order("id").Limit(limit).Find(&users).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return &comatproto.SyncListRepos_Output{}, nil + } + log.Error("failed to query users", "err", err) + return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query users") + } + + if len(users) == 0 { + // resp.Repos is an explicit empty array, not just 'nil' + return &comatproto.SyncListRepos_Output{ + Repos: []*comatproto.SyncListRepos_Repo{}, + }, nil + } + + resp := &comatproto.SyncListRepos_Output{ + Repos: make([]*comatproto.SyncListRepos_Repo, len(users)), + } + + // Fetch the repo roots for each user + for i := range users { + user := users[i] + + root, err := s.repoman.GetRepoRoot(ctx, user.ID) + if err != nil { + log.Error("failed to get repo root", "err", err, "did", user.Did) + return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error())) + } + + resp.Repos[i] = &comatproto.SyncListRepos_Repo{ + Did: user.Did, + Head: root.String(), + } + } + + // If this is not the last page, set the cursor + if len(users) >= limit && len(users) > 1 { + nextCursor := fmt.Sprintf("%d", users[len(users)-1].ID) + resp.Cursor = &nextCursor + } + + return resp, nil +} + func (s *Splitter) EventsHandler(c echo.Context) error { var since *int64 if sinceVal := c.QueryParam("cursor"); sinceVal != "" { From 6967c884cdf51abef3067e34881c0b6a448527f5 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 31 Dec 2024 16:46:17 -0500 Subject: [PATCH 2/2] working listRepos proxy with cache --- cmd/rainbow/main.go | 11 ++-- splitter/splitter.go | 152 ++++++++++++++++++++++++++++++++----------- 2 files changed, 119 insertions(+), 44 deletions(-) diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go index d574d6260..17ef48d04 100644 --- a/cmd/rainbow/main.go +++ b/cmd/rainbow/main.go @@ -43,11 +43,12 @@ func run(args []string) { } app.Flags = []cli.Flag{ - &cli.BoolFlag{ - Name: "crawl-insecure-ws", - Usage: "when connecting to PDS instances, use ws:// instead of wss://", - EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, - }, + // TODO: unimplemented, always assumes https:// and wss:// + //&cli.BoolFlag{ + // Name: "crawl-insecure-ws", + // Usage: "when connecting to PDS instances, use ws:// instead of wss://", + // EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, + //}, &cli.StringFlag{ Name: "splitter-host", Value: "bsky.network", diff --git a/splitter/splitter.go b/splitter/splitter.go index ef54ff4da..563a14c88 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "go.opentelemetry.io/otel" - "gorm.io/gorm" "io" "log/slog" "math/rand" @@ -51,9 +50,17 @@ type Splitter struct { httpC *http.Client nextCrawlers []*url.URL + + // from upsream listRepos + cachedRepos []*atproto.SyncListRepos_Repo + cachedReposFrom time.Time + cachedReposExpires time.Time + cacheReposIsRefetching bool + cachedReposLock sync.Mutex + cachedReposCond sync.Cond } -const DefaultListReposCacheDuration = time.Minute * 5 +const DefaultListReposCacheDuration = time.Minute * 15 type SplitterConfig struct { UpstreamHost string @@ -70,6 +77,22 @@ func (sc *SplitterConfig) normalize() error { return nil } +func (sc *SplitterConfig) XrpcRootUrl() string { + if strings.HasPrefix(sc.UpstreamHost, "http://") { + return sc.UpstreamHost + } + if strings.HasPrefix(sc.UpstreamHost, "https://") { + return sc.UpstreamHost + } + if strings.HasPrefix(sc.UpstreamHost, "ws://") { + return "http://" + sc.UpstreamHost[5:] + } + if strings.HasPrefix(sc.UpstreamHost, "wss://") { + return "https://" + sc.UpstreamHost[5:] + } + return "https://" + sc.UpstreamHost +} + func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { if err := conf.normalize(); err != nil { return nil, err @@ -95,6 +118,7 @@ func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) httpC: util.RobustHTTPClient(), nextCrawlers: nextCrawlerURLs, } + s.cachedReposCond.L = &s.cachedReposLock if conf.PebbleOptions == nil { // mem splitter @@ -348,7 +372,7 @@ func (s *Splitter) RequestCrawlHandler(c echo.Context) error { } func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { - // TODO: identical to bgs/stubs.go - re-unify? + // TODO: similar to bgs/stubs.go - re-unify? ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") defer span.End() @@ -365,12 +389,13 @@ func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { } } - cursor := int64(0) + cursor := int(0) if cursorQuery != "" { - cursor, err = strconv.ParseInt(cursorQuery, 10, 64) + tcursor, err := strconv.ParseInt(cursorQuery, 10, 64) if err != nil || cursor < 0 { return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cursor: %s", cursorQuery)}) } + cursor = int(tcursor) } out, handleErr := s.handleComAtprotoSyncListRepos(ctx, cursor, limit) @@ -380,54 +405,103 @@ func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { return c.JSON(200, out) } -func (s *Splitter) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, limit int) (*comatproto.SyncListRepos_Output, error) { - // Filter out tombstoned, taken down, and deactivated accounts - q := fmt.Sprintf("id > ? AND NOT tombstoned AND NOT taken_down AND (upstream_status is NULL OR (upstream_status != '%s' AND upstream_status != '%s' AND upstream_status != '%s'))", - events.AccountStatusDeactivated, events.AccountStatusSuspended, events.AccountStatusTakendown) - - // Load the users - users := []*User{} - if err := s.db.Model(&User{}).Where(q, cursor).Order("id").Limit(limit).Find(&users).Error; err != nil { - if err == gorm.ErrRecordNotFound { - return &comatproto.SyncListRepos_Output{}, nil +// fetch the full repoList (all pages) from upstream source, keep that locally cached for redistribution +// usage go s.fetchUpstreamRepoList() +func (s *Splitter) fetchUpstreamRepoList() { + // setup + s.cachedReposLock.Lock() + if s.cacheReposIsRefetching { + s.cachedReposLock.Unlock() + return + } + start := time.Now() + if start.Before(s.cachedReposExpires) { + s.cachedReposLock.Unlock() + return + } + s.cacheReposIsRefetching = true + s.cachedReposLock.Unlock() + + var client xrpc.Client + client.Host = s.conf.XrpcRootUrl() + cursor := "" + // TODO: upstream list is stable by User.ID in database there, ID is lost in atproto record, create a local ID and keep a stable map[DID]int and sort full set on those IDs + var repos []*atproto.SyncListRepos_Repo + ok := true + s.log.Info("listRepos from upstream", "host", client.Host) + for { + response, err := atproto.SyncListRepos(context.Background(), &client, cursor, 1000) + if err != nil { + s.log.Warn("failed to fetch upstream repo list", "err", err) + ok = false + break + } + s.log.Debug("some repos", "count", len(response.Repos)) + if len(response.Repos) == 0 { + break } - log.Error("failed to query users", "err", err) - return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query users") + repos = append(repos, response.Repos...) + if response.Cursor == nil || len(response.Repos) == 0 { + break + } + cursor = *response.Cursor } - if len(users) == 0 { - // resp.Repos is an explicit empty array, not just 'nil' - return &comatproto.SyncListRepos_Output{ - Repos: []*comatproto.SyncListRepos_Repo{}, - }, nil + s.cachedReposLock.Lock() + if ok { + s.cachedReposFrom = time.Now() + s.cachedReposExpires = s.cachedReposFrom.Add(s.conf.ListReposCacheDuration) + s.cachedRepos = repos + s.cachedReposCond.Broadcast() + dt := s.cachedReposFrom.Sub(start) + s.log.Info("fetched upstream repo list", "duration", dt.String(), "count", len(repos)) } + s.cacheReposIsRefetching = false + s.cachedReposLock.Unlock() +} - resp := &comatproto.SyncListRepos_Output{ - Repos: make([]*comatproto.SyncListRepos_Repo, len(users)), +// getLatestUpstreamRepoList will usually return whatever the most recent return was, but may block. +// On the first time it should block until it has data from upstream. +// It may spawn an async fetch of new listRepos from upstream which will be available on the _next_ call. +// The return from this function should be read-only. +func (s *Splitter) getLatestUpstreamRepoList() []*atproto.SyncListRepos_Repo { + s.cachedReposLock.Lock() + defer s.cachedReposLock.Unlock() + now := time.Now() + if now.After(s.cachedReposExpires) && !s.cacheReposIsRefetching { + go s.fetchUpstreamRepoList() + } + for { + if len(s.cachedRepos) > 0 || now.Before(s.cachedReposExpires) { + // either we have some data, or we have a legit empty response + return s.cachedRepos + } + s.cachedReposCond.Wait() + now = time.Now() + if now.After(s.cachedReposExpires) && !s.cacheReposIsRefetching { + go s.fetchUpstreamRepoList() + } } +} - // Fetch the repo roots for each user - for i := range users { - user := users[i] +func (s *Splitter) handleComAtprotoSyncListRepos(ctx context.Context, cursor int, limit int) (*comatproto.SyncListRepos_Output, error) { + repos := s.getLatestUpstreamRepoList() - root, err := s.repoman.GetRepoRoot(ctx, user.ID) - if err != nil { - log.Error("failed to get repo root", "err", err, "did", user.Did) - return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error())) - } + resp := &comatproto.SyncListRepos_Output{} - resp.Repos[i] = &comatproto.SyncListRepos_Repo{ - Did: user.Did, - Head: root.String(), - } + if cursor > len(repos) { + return resp, nil } - // If this is not the last page, set the cursor - if len(users) >= limit && len(users) > 1 { - nextCursor := fmt.Sprintf("%d", users[len(users)-1].ID) + repos = repos[cursor:] + if limit > len(repos) { + repos = repos[:limit] + nextCursor := strconv.Itoa(cursor + limit) resp.Cursor = &nextCursor } + resp.Repos = repos + return resp, nil }