Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rainbow list repos #892

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/rainbow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ func run(args []string) {
}

app.Flags = []cli.Flag{
&cli.BoolFlag{
Name: "crawl-insecure-ws",
Usage: "when connecting to PDS instances, use ws:// instead of wss://",
EnvVars: []string{"RAINBOW_INSECURE_CRAWL"},
},
// TODO: unimplemented, always assumes https:// and wss://
//&cli.BoolFlag{
// Name: "crawl-insecure-ws",
// Usage: "when connecting to PDS instances, use ws:// instead of wss://",
// EnvVars: []string{"RAINBOW_INSECURE_CRAWL"},
//},
&cli.StringFlag{
Name: "splitter-host",
Value: "bsky.network",
Expand Down
182 changes: 179 additions & 3 deletions splitter/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"go.opentelemetry.io/otel"
"io"
"log/slog"
"math/rand"
Expand Down Expand Up @@ -49,15 +50,53 @@ type Splitter struct {

httpC *http.Client
nextCrawlers []*url.URL

// from upsream listRepos
cachedRepos []*atproto.SyncListRepos_Repo
cachedReposFrom time.Time
cachedReposExpires time.Time
cacheReposIsRefetching bool
cachedReposLock sync.Mutex
cachedReposCond sync.Cond
}

const DefaultListReposCacheDuration = time.Minute * 15

type SplitterConfig struct {
UpstreamHost string
CursorFile string
PebbleOptions *events.PebblePersistOptions

ListReposCacheDuration time.Duration
}

func (sc *SplitterConfig) normalize() error {
if sc.ListReposCacheDuration == 0 {
sc.ListReposCacheDuration = DefaultListReposCacheDuration
}
return nil
}

func (sc *SplitterConfig) XrpcRootUrl() string {
if strings.HasPrefix(sc.UpstreamHost, "http://") {
return sc.UpstreamHost
}
if strings.HasPrefix(sc.UpstreamHost, "https://") {
return sc.UpstreamHost
}
if strings.HasPrefix(sc.UpstreamHost, "ws://") {
return "http://" + sc.UpstreamHost[5:]
}
if strings.HasPrefix(sc.UpstreamHost, "wss://") {
return "https://" + sc.UpstreamHost[5:]
}
return "https://" + sc.UpstreamHost
}

func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) {
if err := conf.normalize(); err != nil {
return nil, err
}
var nextCrawlerURLs []*url.URL
log := slog.Default().With("system", "splitter")
if len(nextCrawlers) > 0 {
Expand All @@ -79,6 +118,7 @@ func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error)
httpC: util.RobustHTTPClient(),
nextCrawlers: nextCrawlerURLs,
}
s.cachedReposCond.L = &s.cachedReposLock

if conf.PebbleOptions == nil {
// mem splitter
Expand All @@ -105,9 +145,10 @@ func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*
MaxBytes: uint64(maxBytes),
}
conf := SplitterConfig{
UpstreamHost: host,
CursorFile: "cursor-file",
PebbleOptions: &ppopts,
UpstreamHost: host,
CursorFile: "cursor-file",
PebbleOptions: &ppopts,
ListReposCacheDuration: DefaultListReposCacheDuration,
}
pp, err := events.NewPebblePersistance(&ppopts)
if err != nil {
Expand Down Expand Up @@ -207,6 +248,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error {

e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler)
e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler)
e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos)

e.GET("/xrpc/_health", s.HandleHealthCheck)
e.GET("/_health", s.HandleHealthCheck)
Expand Down Expand Up @@ -329,6 +371,140 @@ func (s *Splitter) RequestCrawlHandler(c echo.Context) error {
return c.JSON(200, HealthStatus{Status: "ok"})
}

func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error {
// TODO: similar to bgs/stubs.go - re-unify?
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos")
defer span.End()

cursorQuery := c.QueryParam("cursor")
limitQuery := c.QueryParam("limit")

var err error

limit := 500
if limitQuery != "" {
limit, err = strconv.Atoi(limitQuery)
if err != nil || limit < 1 || limit > 1000 {
return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)})
}
}

cursor := int(0)
if cursorQuery != "" {
tcursor, err := strconv.ParseInt(cursorQuery, 10, 64)
if err != nil || cursor < 0 {
return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cursor: %s", cursorQuery)})
}
cursor = int(tcursor)
}

out, handleErr := s.handleComAtprotoSyncListRepos(ctx, cursor, limit)
if handleErr != nil {
return handleErr
}
return c.JSON(200, out)
}

// fetch the full repoList (all pages) from upstream source, keep that locally cached for redistribution
// usage go s.fetchUpstreamRepoList()
func (s *Splitter) fetchUpstreamRepoList() {
// setup
s.cachedReposLock.Lock()
if s.cacheReposIsRefetching {
s.cachedReposLock.Unlock()
return
}
start := time.Now()
if start.Before(s.cachedReposExpires) {
s.cachedReposLock.Unlock()
return
}
s.cacheReposIsRefetching = true
s.cachedReposLock.Unlock()

var client xrpc.Client
client.Host = s.conf.XrpcRootUrl()
cursor := ""
// TODO: upstream list is stable by User.ID in database there, ID is lost in atproto record, create a local ID and keep a stable map[DID]int and sort full set on those IDs
var repos []*atproto.SyncListRepos_Repo
ok := true
s.log.Info("listRepos from upstream", "host", client.Host)
for {
response, err := atproto.SyncListRepos(context.Background(), &client, cursor, 1000)
if err != nil {
s.log.Warn("failed to fetch upstream repo list", "err", err)
ok = false
break
}
s.log.Debug("some repos", "count", len(response.Repos))
if len(response.Repos) == 0 {
break
}
repos = append(repos, response.Repos...)
if response.Cursor == nil || len(response.Repos) == 0 {
break
}
cursor = *response.Cursor
}

s.cachedReposLock.Lock()
if ok {
s.cachedReposFrom = time.Now()
s.cachedReposExpires = s.cachedReposFrom.Add(s.conf.ListReposCacheDuration)
s.cachedRepos = repos
s.cachedReposCond.Broadcast()
dt := s.cachedReposFrom.Sub(start)
s.log.Info("fetched upstream repo list", "duration", dt.String(), "count", len(repos))
}
s.cacheReposIsRefetching = false
s.cachedReposLock.Unlock()
}

// getLatestUpstreamRepoList will usually return whatever the most recent return was, but may block.
// On the first time it should block until it has data from upstream.
// It may spawn an async fetch of new listRepos from upstream which will be available on the _next_ call.
// The return from this function should be read-only.
func (s *Splitter) getLatestUpstreamRepoList() []*atproto.SyncListRepos_Repo {
s.cachedReposLock.Lock()
defer s.cachedReposLock.Unlock()
now := time.Now()
if now.After(s.cachedReposExpires) && !s.cacheReposIsRefetching {
go s.fetchUpstreamRepoList()
}
for {
if len(s.cachedRepos) > 0 || now.Before(s.cachedReposExpires) {
// either we have some data, or we have a legit empty response
return s.cachedRepos
}
s.cachedReposCond.Wait()
now = time.Now()
if now.After(s.cachedReposExpires) && !s.cacheReposIsRefetching {
go s.fetchUpstreamRepoList()
}
}
}

func (s *Splitter) handleComAtprotoSyncListRepos(ctx context.Context, cursor int, limit int) (*comatproto.SyncListRepos_Output, error) {
repos := s.getLatestUpstreamRepoList()

resp := &comatproto.SyncListRepos_Output{}

if cursor > len(repos) {
return resp, nil
}

repos = repos[cursor:]
if limit > len(repos) {
repos = repos[:limit]
nextCursor := strconv.Itoa(cursor + limit)
resp.Cursor = &nextCursor
}

resp.Repos = repos

return resp, nil
}

func (s *Splitter) EventsHandler(c echo.Context) error {
var since *int64
if sinceVal := c.QueryParam("cursor"); sinceVal != "" {
Expand Down
Loading