Skip to content

Commit

Permalink
simple proxy of listRepos to rainbow upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Jan 9, 2025
1 parent 3232473 commit eab5204
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
11 changes: 6 additions & 5 deletions cmd/rainbow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ func run(args []string) {
}

app.Flags = []cli.Flag{
&cli.BoolFlag{
Name: "crawl-insecure-ws",
Usage: "when connecting to PDS instances, use ws:// instead of wss://",
EnvVars: []string{"RAINBOW_INSECURE_CRAWL"},
},
// TODO: unimplemented, always assumes https:// and wss://
//&cli.BoolFlag{
// Name: "crawl-insecure-ws",
// Usage: "when connecting to PDS instances, use ws:// instead of wss://",
// EnvVars: []string{"RAINBOW_INSECURE_CRAWL"},
//},
&cli.StringFlag{
Name: "splitter-host",
Value: "bsky.network",
Expand Down
47 changes: 47 additions & 0 deletions splitter/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"go.opentelemetry.io/otel"
"io"
"log/slog"
"math/rand"
Expand Down Expand Up @@ -57,6 +58,22 @@ type SplitterConfig struct {
PebbleOptions *events.PebblePersistOptions
}

func (sc *SplitterConfig) XrpcRootUrl() string {
if strings.HasPrefix(sc.UpstreamHost, "http://") {
return sc.UpstreamHost
}
if strings.HasPrefix(sc.UpstreamHost, "https://") {
return sc.UpstreamHost
}
if strings.HasPrefix(sc.UpstreamHost, "ws://") {
return "http://" + sc.UpstreamHost[5:]
}
if strings.HasPrefix(sc.UpstreamHost, "wss://") {
return "https://" + sc.UpstreamHost[6:]
}
return "https://" + sc.UpstreamHost
}

func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) {
var nextCrawlerURLs []*url.URL
log := slog.Default().With("system", "splitter")
Expand Down Expand Up @@ -207,6 +224,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error {

e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler)
e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler)
e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos)

e.GET("/xrpc/_health", s.HandleHealthCheck)
e.GET("/_health", s.HandleHealthCheck)
Expand Down Expand Up @@ -329,6 +347,35 @@ func (s *Splitter) RequestCrawlHandler(c echo.Context) error {
return c.JSON(200, HealthStatus{Status: "ok"})
}

func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error {
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos")
defer span.End()

cursorQuery := c.QueryParam("cursor")
limitQuery := c.QueryParam("limit")

var err error

limit := int64(500)
if limitQuery != "" {
limit, err = strconv.ParseInt(limitQuery, 10, 64)
if err != nil || limit < 1 || limit > 1000 {
return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)})
}
}

client := xrpc.Client{
Client: s.httpC,
Host: s.conf.XrpcRootUrl(),
}

out, handleErr := atproto.SyncListRepos(ctx, &client, cursorQuery, limit)
if handleErr != nil {
return handleErr
}
return c.JSON(200, out)
}

func (s *Splitter) EventsHandler(c echo.Context) error {
var since *int64
if sinceVal := c.QueryParam("cursor"); sinceVal != "" {
Expand Down

0 comments on commit eab5204

Please sign in to comment.