diff --git a/cmd/rainbow/main.go b/cmd/rainbow/main.go index d574d6260..17ef48d04 100644 --- a/cmd/rainbow/main.go +++ b/cmd/rainbow/main.go @@ -43,11 +43,12 @@ func run(args []string) { } app.Flags = []cli.Flag{ - &cli.BoolFlag{ - Name: "crawl-insecure-ws", - Usage: "when connecting to PDS instances, use ws:// instead of wss://", - EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, - }, + // TODO: unimplemented, always assumes https:// and wss:// + //&cli.BoolFlag{ + // Name: "crawl-insecure-ws", + // Usage: "when connecting to PDS instances, use ws:// instead of wss://", + // EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, + //}, &cli.StringFlag{ Name: "splitter-host", Value: "bsky.network", diff --git a/splitter/splitter.go b/splitter/splitter.go index 893414194..e4af6b043 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "go.opentelemetry.io/otel" "io" "log/slog" "math/rand" @@ -57,6 +58,22 @@ type SplitterConfig struct { PebbleOptions *events.PebblePersistOptions } +func (sc *SplitterConfig) XrpcRootUrl() string { + if strings.HasPrefix(sc.UpstreamHost, "http://") { + return sc.UpstreamHost + } + if strings.HasPrefix(sc.UpstreamHost, "https://") { + return sc.UpstreamHost + } + if strings.HasPrefix(sc.UpstreamHost, "ws://") { + return "http://" + sc.UpstreamHost[5:] + } + if strings.HasPrefix(sc.UpstreamHost, "wss://") { + return "https://" + sc.UpstreamHost[6:] + } + return "https://" + sc.UpstreamHost +} + func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { var nextCrawlerURLs []*url.URL log := slog.Default().With("system", "splitter") @@ -207,6 +224,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error { e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) + e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) e.GET("/xrpc/_health", s.HandleHealthCheck) e.GET("/_health", s.HandleHealthCheck) @@ -329,6 +347,35 @@ func (s *Splitter) RequestCrawlHandler(c echo.Context) error { return c.JSON(200, HealthStatus{Status: "ok"}) } +func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") + defer span.End() + + cursorQuery := c.QueryParam("cursor") + limitQuery := c.QueryParam("limit") + + var err error + + limit := int64(500) + if limitQuery != "" { + limit, err = strconv.ParseInt(limitQuery, 10, 64) + if err != nil || limit < 1 || limit > 1000 { + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)}) + } + } + + client := xrpc.Client{ + Client: s.httpC, + Host: s.conf.XrpcRootUrl(), + } + + out, handleErr := atproto.SyncListRepos(ctx, &client, cursorQuery, limit) + if handleErr != nil { + return handleErr + } + return c.JSON(200, out) +} + func (s *Splitter) EventsHandler(c echo.Context) error { var since *int64 if sinceVal := c.QueryParam("cursor"); sinceVal != "" {