From 8fc7e476638640c9a05c680243b128d2a13dfac5 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 18:41:00 +0000 Subject: [PATCH 1/7] Add configurable trusted slurp domains --- bgs/fedmgr.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index 656b00ac9..dae58e22e 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/rand" + "strings" "sync" "time" @@ -36,6 +37,7 @@ type Slurper struct { DefaultCrawlLimit rate.Limit newSubsDisabled bool + trustedDomains []string shutdownChan chan bool shutdownResult chan []error @@ -171,6 +173,7 @@ func (s *Slurper) loadConfig() error { } s.newSubsDisabled = sc.NewSubsDisabled + s.trustedDomains = sc.TrustedDomains return nil } @@ -179,6 +182,7 @@ type SlurpConfig struct { gorm.Model NewSubsDisabled bool + TrustedDomains []string } func (s *Slurper) SetNewSubsDisabled(dis bool) error { @@ -199,13 +203,83 @@ func (s *Slurper) GetNewSubsDisabledState() bool { return s.newSubsDisabled } +func (s *Slurper) AddTrustedDomain(domain string) error { + s.lk.Lock() + defer s.lk.Unlock() + + if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", gorm.Expr("array_append(trusted_domains, ?)", domain)).Error; err != nil { + return err + } + + s.trustedDomains = append(s.trustedDomains, domain) + return nil +} + +func (s *Slurper) RemoveTrustedDomain(domain string) error { + s.lk.Lock() + defer s.lk.Unlock() + + if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", gorm.Expr("array_remove(trusted_domains, ?)", domain)).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + } + + for i, d := range s.trustedDomains { + if d == domain { + s.trustedDomains = append(s.trustedDomains[:i], s.trustedDomains[i+1:]...) + break + } + } + + return nil +} + +func (s *Slurper) SetTrustedDomains(domains []string) error { + s.lk.Lock() + defer s.lk.Unlock() + + if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", domains).Error; err != nil { + return err + } + + s.trustedDomains = domains + return nil +} + +func (s *Slurper) GetTrustedDomains() []string { + s.lk.Lock() + defer s.lk.Unlock() + return s.trustedDomains +} + var ErrNewSubsDisabled = fmt.Errorf("new subscriptions temporarily disabled") func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool) error { // TODO: for performance, lock on the hostname instead of global s.lk.Lock() defer s.lk.Unlock() - if s.newSubsDisabled { + + isTrusted := false + + // Check if the host is a trusted domain + for _, d := range s.trustedDomains { + // If the domain starts with a *., it's a wildcard + if strings.HasPrefix(d, "*.") { + if strings.HasSuffix(host, strings.TrimPrefix(d, "*")) { + isTrusted = true + break + } + } else { + if host == d { + isTrusted = true + break + } + } + } + + if s.newSubsDisabled && !isTrusted { return ErrNewSubsDisabled } From 565932d14acc73f975260d17c47490be2ca3df5c Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 19:38:47 +0000 Subject: [PATCH 2/7] Use pq.stringArray for string slice --- bgs/fedmgr.go | 3 ++- go.mod | 1 + go.sum | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index dae58e22e..f5fe9fc21 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -17,6 +17,7 @@ import ( "golang.org/x/time/rate" "github.com/gorilla/websocket" + pq "github.com/lib/pq" "gorm.io/gorm" ) @@ -182,7 +183,7 @@ type SlurpConfig struct { gorm.Model NewSubsDisabled bool - TrustedDomains []string + TrustedDomains pq.StringArray `gorm:"type:text[]"` } func (s *Slurper) SetNewSubsDisabled(dis bool) error { diff --git a/go.mod b/go.mod index 9ecb7ff9c..1266f4f13 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/labstack/echo/v4 v4.11.1 github.com/labstack/gommon v0.4.0 github.com/lestrrat-go/jwx/v2 v2.0.12 + github.com/lib/pq v1.10.9 github.com/minio/sha256-simd v1.0.0 github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.2.0 diff --git a/go.sum b/go.sum index b2aaa467c..60b4d3b72 100644 --- a/go.sum +++ b/go.sum @@ -405,6 +405,8 @@ github.com/lestrrat-go/jwx/v2 v2.0.12/go.mod h1:Mq4KN1mM7bp+5z/W5HS8aCNs5RKZ911G github.com/lestrrat-go/option v1.0.0/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= From 27e38bfd87b768f01a6686e02babe8ab20f5eeeb Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 19:40:36 +0000 Subject: [PATCH 3/7] Fix backfill test --- backfill/backfill_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backfill/backfill_test.go b/backfill/backfill_test.go index 4c7f7f4b7..885196e21 100644 --- a/backfill/backfill_test.go +++ b/backfill/backfill_test.go @@ -32,6 +32,7 @@ func TestBackfill(t *testing.T) { ts := &testState{} opts := backfill.DefaultBackfillOptions() + opts.CheckoutPath = "https://bsky.network/xrpc/com.atproto.sync.getRepo" opts.NSIDFilter = "app.bsky.feed.follow/" bf := backfill.NewBackfiller( From 765c6ab88be4253f3aa0a602c9942b7f4a2dbb29 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 23:31:46 +0000 Subject: [PATCH 4/7] Clean up trusted host logic a tiny bit --- bgs/fedmgr.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index f5fe9fc21..483ac0641 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -257,30 +257,33 @@ func (s *Slurper) GetTrustedDomains() []string { var ErrNewSubsDisabled = fmt.Errorf("new subscriptions temporarily disabled") -func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool) error { - // TODO: for performance, lock on the hostname instead of global - s.lk.Lock() - defer s.lk.Unlock() - - isTrusted := false - +// Checks whether a host is allowed to be subscribed to +// must be called with the slurper lock held +func (s *Slurper) canSlurpHost(host string) bool { // Check if the host is a trusted domain for _, d := range s.trustedDomains { // If the domain starts with a *., it's a wildcard if strings.HasPrefix(d, "*.") { + // Cut off the * so we have .domain.com if strings.HasSuffix(host, strings.TrimPrefix(d, "*")) { - isTrusted = true - break + return true } } else { if host == d { - isTrusted = true - break + return true } } } - if s.newSubsDisabled && !isTrusted { + return !s.newSubsDisabled +} + +func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool) error { + // TODO: for performance, lock on the hostname instead of global + s.lk.Lock() + defer s.lk.Unlock() + + if !s.canSlurpHost(host) { return ErrNewSubsDisabled } From 744cd01182b9a3cf78bbc9d4ac684c87bae028d1 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Wed, 8 Nov 2023 23:40:35 +0000 Subject: [PATCH 5/7] Fix panic --- bgs/bgs.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index d44963a1c..25061ec58 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -596,20 +596,26 @@ func (bgs *BGS) EventsHandler(c echo.Context) error { consumerID := bgs.registerConsumer(&consumer) defer bgs.cleanupConsumer(consumerID) - log.Infow("new consumer", + logger := log.With( + "consumer_id", consumerID, "remote_addr", consumer.RemoteAddr, "user_agent", consumer.UserAgent, - "cursor", since, - "consumer_id", consumerID, ) + logger.Infow("new consumer", "cursor", since) + header := events.EventHeader{Op: events.EvtKindMessage} for { select { - case evt := <-evts: + case evt, ok := <-evts: + if !ok { + logger.Error("event stream closed unexpectedly") + return nil + } + wc, err := conn.NextWriter(websocket.BinaryMessage) if err != nil { - log.Errorf("failed to get next writer: %s", err) + logger.Errorf("failed to get next writer: %s", err) return err } @@ -647,7 +653,7 @@ func (bgs *BGS) EventsHandler(c echo.Context) error { } if err := wc.Close(); err != nil { - log.Warnf("failed to flush-close our event write: %s", err) + logger.Warnf("failed to flush-close our event write: %s", err) return nil } From 2e6231bbe5f5e70a010e1d56a5fcd6ff9f55182b Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 9 Nov 2023 18:48:18 +0000 Subject: [PATCH 6/7] Harden requestCrawl host parsing --- bgs/handlers.go | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/bgs/handlers.go b/bgs/handlers.go index 33b0bcef8..b62bf3368 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strconv" "strings" @@ -16,7 +17,6 @@ import ( "github.com/bluesky-social/indigo/mst" "gorm.io/gorm" - "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/xrpc" "github.com/ipfs/go-cid" "github.com/labstack/echo/v4" @@ -106,15 +106,37 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") } - if strings.HasPrefix(host, "https://") || strings.HasPrefix(host, "http://") { - return echo.NewHTTPError(http.StatusBadRequest, "must pass domain without protocol scheme") + if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { + if s.ssl { + host = "https://" + host + } else { + host = "http://" + host + } } - norm, err := util.NormalizeHostname(host) + u, err := url.Parse(host) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "failed to normalize hostname") + return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") + } + + if u.Scheme == "http" && s.ssl { + return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") + } + + if u.Scheme == "https" && !s.ssl { + return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") + } + + if u.Path != "" { + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") + } + + if u.Query().Encode() != "" { + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") } + host = u.Host // potentially hostname:port + banned, err := s.domainIsBanned(ctx, host) if banned { return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned") @@ -123,14 +145,10 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp log.Warnf("TODO: better host validation for crawl requests") c := &xrpc.Client{ - Host: "https://" + host, + Host: fmt.Sprintf("%s://%s", u.Scheme, host), Client: http.DefaultClient, // not using the client that auto-retries } - if !s.ssl { - c.Host = "http://" + host - } - desc, err := atproto.ServerDescribeServer(ctx, c) if err != nil { return echo.NewHTTPError(http.StatusBadRequest, "requested host failed to respond to describe request") @@ -139,7 +157,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp // Maybe we could do something with this response later _ = desc - return s.slurper.SubscribeToPds(ctx, norm, true) + return s.slurper.SubscribeToPds(ctx, host, true) } func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { From f7372c1934eafec1b4ba3246fc10bfcd68afde87 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 9 Nov 2023 18:53:14 +0000 Subject: [PATCH 7/7] More verbose client error --- bgs/handlers.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bgs/handlers.go b/bgs/handlers.go index b62bf3368..cb7ea6170 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -144,14 +144,17 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp log.Warnf("TODO: better host validation for crawl requests") + clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) + c := &xrpc.Client{ - Host: fmt.Sprintf("%s://%s", u.Scheme, host), + Host: clientHost, Client: http.DefaultClient, // not using the client that auto-retries } desc, err := atproto.ServerDescribeServer(ctx, c) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "requested host failed to respond to describe request") + errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) + return echo.NewHTTPError(http.StatusBadRequest, errMsg) } // Maybe we could do something with this response later