diff --git a/automod/README.md b/automod/README.md index 3177f4318..701698642 100644 --- a/automod/README.md +++ b/automod/README.md @@ -1,5 +1,5 @@ -indigo/automod -============== +`indigo/automod` +================ This package (`github.com/bluesky-social/indigo/automod`) contains a "rules engine" to augment human moderators in the atproto network. Batches of rules are processed for novel "events" such as a new post or update of an account handle. Counters and other statistics are collected, which can drive subsequent rule invocations. The outcome of rules can be moderation events like "report account for human review" or "label post". A lot of what this package does is collect and maintain caches of relevant metadata about accounts and pieces of content, so that rules have efficient access to this information. @@ -7,12 +7,71 @@ A primary design goal is to have a flexible framework to allow new rules to be w Some example rules are included in the `automod/rules` package, but the expectation is that some real-world rules will be kept secret. -Code for subscribing to a firehose is not included here; see `cmd/hepa` for a complete service built on this library. +Code for subscribing to a firehose is not included here; see `../cmd/hepa` for a service daemon built on this package. +API reference documentation can be found on [pkg.go.dev](https://pkg.go.dev/github.com/bluesky-social/indigo/automod). -## Design +## Architecture -Prior art and inspiration: +The runtime (`automod.Engine`) manages network requests, caching, and configuration. Outside calling code makes concurrent calls to the `Process*Event` methods that the runtime provides. The runtime constructs event structs (eg, `automod.RecordEvent`), hydrates relevant context metadata from (cached) external services, and then executes a configured set of rules on the event. Rules may request additional context, do arbitrary local compute, and mute the event with any moderation "actions". After all rules have run, the runtime will inspect the event, update counter state, and push any new moderation actions to external services. + +The runtime keeps state in several "stores", each of which has an interface and both in-memory and Redis implementations. It is expected that Redis is used in virtually all deployments. The store types are: + +- `automod.CacheStore`: generic data caching with expiration (TTL) and explicit purging. Used to cache account-level metadata, including identity lookups and (if available) private account metadata +- `automod.CountStore`: keyed integer counters with time bucketing (eg, "hour", "day", "total"). Also includes probabilistic "distinct value" counters (eg, Redis HyperLogLog counters, with roughly 2% precision) +- `automod.SetStore`: configurable static string sets. May eventually be runtime configurable +- `automod.FlagStore`: mechanism to keep track of automod-generated "flags" (like labels or hashtags) on accounts or records. Mostly used to detect *new* flags. May eventually be moved in to the moderation service itself, similar to labels + + +## Rule API + +Here is a simple example rule, which handles creation of new events: + +```golang +var gtubeString = "XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X" + +func GtubePostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + if strings.Contains(post.Text, gtubeString) { + evt.AddRecordLabel("spam") + } + return nil +} +``` + +Every new post record will be inspected to see if it contains a static test string. If it does, the label `spam` will be applied to the record itself. + +The `evt` parameter provides access to relevant pre-fetched metadata; methods to fetch additional metadata from the network; a `slog` logging interface; and methods to store output decisions. The runtime will catch and recover from unexpected panics, and will log returned errors, but rules are generally expected to run robustly and efficiently, and not have complex control flow needs. + +Some of the more commonly used features of `evt` (`automod.RecordEvent`): + +- `evt.Logger`: a `log/slog` logging interface +- `evt.Account.Identity`: atproto identity for the author account, including DID, handle, and PDS endpoint +- `evt.Account.Private`: when not-null (aka, when the runtime has administrator access) will contain things like `.IndexedAt` (account first seen) and `.Email` (the current registered account email) +- `evt.Account.Profile`: a cached subset of the account's `app.bsky.actor.profile` record (if non-null) +- `evt.GetCount(, , )` and `evt.Increment(, )`: to access and update simple counters (by hour, day, or total). Incrementing counters is lazy and happens in batch after all rules have executed: this means that multiple calls are de-duplicated, and that `GetCount` will not reflect any prior `Increment` calls in the same rule (or between rules). +- `evt.GetCountDistinct(, , )` and `evt.IncrementDistinct(, , )`: similar to simple counters, but counts "unique distinct values" +- `evt.InSet(, )`: checks if a string is in a named set + + +## Developing New Rules + +The current tl;dr process to deploy a new rule: + +- copy a similar existing rule from `automod/rules` +- add the new rule to a `RuleSet`, so it will be invoked +- test against content that triggers the new rule +- deploy + +You'll usually want to start with both a known pattern you are looking for, and some example real-world content which you want to trigger on. + +The `automod/rules` package contains a set of example rules and some shared helper functions, and demonstrates some patterns for how to use counters, sets, filters, and account metadata to compose a rule pattern. + +The `hepa` command provides `process-record` and `process-recent` sub-commands which will pull an existing individual record (by AT-URI) or all recent bsky posts for an account (by handle or DID), which can be helpful for testing. + +When deploying a new rule, it is recommended to start with a minimal action, like setting a flag or just logging. Any "action" (including new flag creation) can result in a Slack notification. You can gain confidence in the rule by running against the full firehose with these limited actions, tweaking the rule until it seems to have acceptable sensitivity (eg, few false positives), and then escalate the actions to reporting (adds to the human review queue), or action-and-report (label or takedown, and concurrently report for humans to review the action). + + +## Prior Art * The [SQRL language](https://sqrl-lang.github.io/sqrl/) and runtime was originally developed by an industry vendor named Smyte, then acquired by Twitter, with some core Javascript components released open source in 2023. The SQRL documentation is extensive and describes many of the design trade-offs and features specific to rules engines. Bluesky considered adopting SQRL but decided to start with a simpler runtime with rules in a known language (golang). diff --git a/automod/account_meta.go b/automod/account_meta.go index 625205c04..a498e3830 100644 --- a/automod/account_meta.go +++ b/automod/account_meta.go @@ -26,13 +26,16 @@ type AccountPrivate struct { // information about a repo/account/identity, always pre-populated and relevant to many rules type AccountMeta struct { - Identity *identity.Identity - Profile ProfileSummary - Private *AccountPrivate - AccountLabels []string - FollowersCount int64 - FollowsCount int64 - PostsCount int64 + Identity *identity.Identity + Profile ProfileSummary + Private *AccountPrivate + AccountLabels []string + AccountNegatedLabels []string + AccountFlags []string + FollowersCount int64 + FollowsCount int64 + PostsCount int64 + Takendown bool } func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) (*AccountMeta, error) { @@ -71,8 +74,18 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) ( } var labels []string + var negLabels []string for _, lbl := range pv.Labels { - labels = append(labels, lbl.Val) + if lbl.Neg != nil && *lbl.Neg == true { + negLabels = append(negLabels, lbl.Val) + } else { + labels = append(labels, lbl.Val) + } + } + + flags, err := e.Flags.Get(ctx, ident.DID.String()) + if err != nil { + return nil, err } am := AccountMeta{ @@ -82,7 +95,9 @@ func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) ( Description: pv.Description, DisplayName: pv.DisplayName, }, - AccountLabels: dedupeStrings(labels), + AccountLabels: dedupeStrings(labels), + AccountNegatedLabels: dedupeStrings(negLabels), + AccountFlags: flags, } if pv.PostsCount != nil { am.PostsCount = *pv.PostsCount diff --git a/automod/cachestore.go b/automod/cachestore.go index 4fd33585e..2c7c855ec 100644 --- a/automod/cachestore.go +++ b/automod/cachestore.go @@ -10,6 +10,7 @@ import ( type CacheStore interface { Get(ctx context.Context, name, key string) (string, error) Set(ctx context.Context, name, key string, val string) error + Purge(ctx context.Context, name, key string) error } type MemCacheStore struct { @@ -34,3 +35,8 @@ func (s MemCacheStore) Set(ctx context.Context, name, key string, val string) er s.Data.Add(name+"/"+key, val) return nil } + +func (s MemCacheStore) Purge(ctx context.Context, name, key string) error { + s.Data.Remove(name + "/" + key) + return nil +} diff --git a/automod/countstore.go b/automod/countstore.go index 641594b41..056ce5bb1 100644 --- a/automod/countstore.go +++ b/automod/countstore.go @@ -17,16 +17,20 @@ type CountStore interface { GetCount(ctx context.Context, name, val, period string) (int, error) Increment(ctx context.Context, name, val string) error // TODO: batch increment method + GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) + IncrementDistinct(ctx context.Context, name, bucket, val string) error } // TODO: this implementation isn't race-safe (yet)! type MemCountStore struct { - Counts map[string]int + Counts map[string]int + DistinctCounts map[string]map[string]bool } func NewMemCountStore() MemCountStore { return MemCountStore{ - Counts: make(map[string]int), + Counts: make(map[string]int), + DistinctCounts: make(map[string]map[string]bool), } } @@ -66,3 +70,24 @@ func (s MemCountStore) Increment(ctx context.Context, name, val string) error { } return nil } + +func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) { + v, ok := s.DistinctCounts[PeriodBucket(name, bucket, period)] + if !ok { + return 0, nil + } + return len(v), nil +} + +func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { + for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { + k := PeriodBucket(name, bucket, p) + m, ok := s.DistinctCounts[k] + if !ok { + m = make(map[string]bool) + } + m[val] = true + s.DistinctCounts[k] = m + } + return nil +} diff --git a/automod/countstore_test.go b/automod/countstore_test.go new file mode 100644 index 000000000..8f080718a --- /dev/null +++ b/automod/countstore_test.go @@ -0,0 +1,40 @@ +package automod + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMemCountStoreBasics(t *testing.T) { + assert := assert.New(t) + ctx := context.Background() + + cs := NewMemCountStore() + + c, err := cs.GetCount(ctx, "test1", "val1", PeriodTotal) + assert.NoError(err) + assert.Equal(0, c) + assert.NoError(cs.Increment(ctx, "test1", "val1")) + assert.NoError(cs.Increment(ctx, "test1", "val1")) + c, err = cs.GetCount(ctx, "test1", "val1", PeriodTotal) + assert.NoError(err) + assert.Equal(2, c) + + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) + assert.NoError(err) + assert.Equal(0, c) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) + assert.NoError(err) + assert.Equal(1, c) + + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "two")) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "three")) + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) + assert.NoError(err) + assert.Equal(3, c) +} diff --git a/automod/engine.go b/automod/engine.go index 1bb2a7cc2..53ccb0878 100644 --- a/automod/engine.go +++ b/automod/engine.go @@ -22,17 +22,19 @@ type Engine struct { Counters CountStore Sets SetStore Cache CacheStore + Flags FlagStore RelayClient *xrpc.Client BskyClient *xrpc.Client // used to persist moderation actions in mod service (optional) - AdminClient *xrpc.Client + AdminClient *xrpc.Client + SlackWebhookURL string } func (e *Engine) ProcessIdentityEvent(ctx context.Context, t string, did syntax.DID) error { // similar to an HTTP server, we want to recover any panics from rule execution defer func() { if r := recover(); r != nil { - e.Logger.Error("automod event execution exception", "err", r) + e.Logger.Error("automod event execution exception", "err", r, "did", did, "type", t) } }() @@ -62,9 +64,13 @@ func (e *Engine) ProcessIdentityEvent(ctx context.Context, t string, did syntax. return evt.Err } evt.CanonicalLogLine() + e.PurgeAccountCaches(ctx, am.Identity.DID) if err := evt.PersistActions(ctx); err != nil { return err } + if err := evt.PersistCounters(ctx); err != nil { + return err + } return nil } @@ -72,7 +78,7 @@ func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path, recCID // similar to an HTTP server, we want to recover any panics from rule execution defer func() { if r := recover(); r != nil { - e.Logger.Error("automod event execution exception", "err", r) + e.Logger.Error("automod event execution exception", "err", r, "did", did, "path", path) } }() @@ -97,24 +103,65 @@ func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path, recCID return evt.Err } evt.CanonicalLogLine() + // purge the account meta cache when profile is updated + if evt.Collection == "app.bsky.actor.profile" { + e.PurgeAccountCaches(ctx, am.Identity.DID) + } if err := evt.PersistActions(ctx); err != nil { return err } if err := evt.PersistCounters(ctx); err != nil { return err } - return nil } -func (e *Engine) FetchAndProcessRecord(ctx context.Context, uri string) error { - // resolve URI, identity, and record - aturi, err := syntax.ParseATURI(uri) +func (e *Engine) ProcessRecordDelete(ctx context.Context, did syntax.DID, path string) error { + // similar to an HTTP server, we want to recover any panics from rule execution + defer func() { + if r := recover(); r != nil { + e.Logger.Error("automod event execution exception", "err", r, "did", did, "path", path) + } + }() + + ident, err := e.Directory.LookupDID(ctx, did) + if err != nil { + return fmt.Errorf("resolving identity: %w", err) + } + if ident == nil { + return fmt.Errorf("identity not found for did: %s", did.String()) + } + + am, err := e.GetAccountMeta(ctx, ident) if err != nil { - return fmt.Errorf("parsing AT-URI argument: %v", err) + return err + } + evt := e.NewRecordDeleteEvent(*am, path) + e.Logger.Debug("processing record deletion", "did", ident.DID, "path", path) + if err := e.Rules.CallRecordDeleteRules(&evt); err != nil { + return err + } + if evt.Err != nil { + return evt.Err } + evt.CanonicalLogLine() + // purge the account meta cache when profile is updated + if evt.Collection == "app.bsky.actor.profile" { + e.PurgeAccountCaches(ctx, am.Identity.DID) + } + if err := evt.PersistActions(ctx); err != nil { + return err + } + if err := evt.PersistCounters(ctx); err != nil { + return err + } + return nil +} + +func (e *Engine) FetchAndProcessRecord(ctx context.Context, aturi syntax.ATURI) error { + // resolve URI, identity, and record if aturi.RecordKey() == "" { - return fmt.Errorf("need a full, not partial, AT-URI: %s", uri) + return fmt.Errorf("need a full, not partial, AT-URI: %s", aturi) } ident, err := e.Directory.Lookup(ctx, aturi.Authority()) if err != nil { @@ -137,6 +184,39 @@ func (e *Engine) FetchAndProcessRecord(ctx context.Context, uri string) error { return e.ProcessRecord(ctx, ident.DID, aturi.Path(), *out.Cid, out.Value.Val) } +func (e *Engine) FetchAndProcessRecent(ctx context.Context, atid syntax.AtIdentifier, limit int) error { + + ident, err := e.Directory.Lookup(ctx, atid) + if err != nil { + return fmt.Errorf("failed to resolve AT identifier: %v", err) + } + pdsURL := ident.PDSEndpoint() + if pdsURL == "" { + return fmt.Errorf("could not resolve PDS endpoint for account: %s", ident.DID.String()) + } + pdsClient := xrpc.Client{Host: ident.PDSEndpoint()} + + resp, err := comatproto.RepoListRecords(ctx, &pdsClient, "app.bsky.feed.post", "", int64(limit), ident.DID.String(), false, "", "") + if err != nil { + return fmt.Errorf("failed to fetch record list: %v", err) + } + + e.Logger.Info("got recent posts", "did", ident.DID.String(), "pds", pdsURL, "count", len(resp.Records)) + // records are most-recent first; we want recent but oldest-first, so iterate backwards + for i := range resp.Records { + rec := resp.Records[len(resp.Records)-i-1] + aturi, err := syntax.ParseATURI(rec.Uri) + if err != nil { + return fmt.Errorf("parsing PDS record response: %v", err) + } + err = e.ProcessRecord(ctx, ident.DID, aturi.Path(), rec.Cid, rec.Value.Val) + if err != nil { + return err + } + } + return nil +} + func (e *Engine) NewRecordEvent(am AccountMeta, path, recCID string, rec any) RecordEvent { parts := strings.SplitN(path, "/", 2) return RecordEvent{ @@ -156,11 +236,34 @@ func (e *Engine) NewRecordEvent(am AccountMeta, path, recCID string, rec any) Re } } +func (e *Engine) NewRecordDeleteEvent(am AccountMeta, path string) RecordDeleteEvent { + parts := strings.SplitN(path, "/", 2) + return RecordDeleteEvent{ + RepoEvent{ + Engine: e, + Logger: e.Logger.With("did", am.Identity.DID, "collection", parts[0], "rkey", parts[1]), + Account: am, + }, + parts[0], + parts[1], + } +} + func (e *Engine) GetCount(name, val, period string) (int, error) { return e.Counters.GetCount(context.TODO(), name, val, period) } +func (e *Engine) GetCountDistinct(name, bucket, period string) (int, error) { + return e.Counters.GetCountDistinct(context.TODO(), name, bucket, period) +} + // checks if `val` is an element of set `name` func (e *Engine) InSet(name, val string) (bool, error) { return e.Sets.InSet(context.TODO(), name, val) } + +// purge caches of any exiting metadata +func (e *Engine) PurgeAccountCaches(ctx context.Context, did syntax.DID) error { + e.Directory.Purge(ctx, did.AtIdentifier()) + return e.Cache.Purge(ctx, "acct", did.String()) +} diff --git a/automod/engine_test.go b/automod/engine_test.go index a597f2c9c..25994cd9a 100644 --- a/automod/engine_test.go +++ b/automod/engine_test.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "testing" + "time" appbsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/identity" @@ -14,7 +15,7 @@ import ( func simpleRule(evt *RecordEvent, post *appbsky.FeedPost) error { for _, tag := range post.Tags { - if evt.InSet("banned-hashtags", tag) { + if evt.InSet("bad-hashtags", tag) { evt.AddRecordLabel("bad-hashtag") break } @@ -23,7 +24,7 @@ func simpleRule(evt *RecordEvent, post *appbsky.FeedPost) error { for _, feat := range facet.Features { if feat.RichtextFacet_Tag != nil { tag := feat.RichtextFacet_Tag.Tag - if evt.InSet("banned-hashtags", tag) { + if evt.InSet("bad-hashtags", tag) { evt.AddRecordLabel("bad-hashtag") break } @@ -39,9 +40,11 @@ func engineFixture() Engine { simpleRule, }, } + cache := NewMemCacheStore(10, time.Hour) + flags := NewMemFlagStore() sets := NewMemSetStore() - sets.Sets["banned-hashtags"] = make(map[string]bool) - sets.Sets["banned-hashtags"]["slur"] = true + sets.Sets["bad-hashtags"] = make(map[string]bool) + sets.Sets["bad-hashtags"]["slur"] = true dir := identity.NewMockDirectory() id1 := identity.Identity{ DID: syntax.DID("did:plc:abc111"), @@ -53,6 +56,8 @@ func engineFixture() Engine { Directory: &dir, Counters: NewMemCountStore(), Sets: sets, + Flags: flags, + Cache: cache, Rules: rules, } return engine diff --git a/automod/event.go b/automod/event.go index c9ef6673a..69a17543f 100644 --- a/automod/event.go +++ b/automod/event.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "strings" comatproto "github.com/bluesky-social/indigo/api/atproto" appbsky "github.com/bluesky-social/indigo/api/bsky" @@ -19,19 +20,26 @@ type CounterRef struct { Val string } +type CounterDistinctRef struct { + Name string + Bucket string + Val string +} + // base type for events specific to an account, usually derived from a repo event stream message (one such message may result in multiple `RepoEvent`) // // events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations. type RepoEvent struct { - Engine *Engine - Err error - Logger *slog.Logger - Account AccountMeta - CounterIncrements []CounterRef - AccountLabels []string - AccountFlags []string - AccountReports []ModReport - AccountTakedown bool + Engine *Engine + Err error + Logger *slog.Logger + Account AccountMeta + CounterIncrements []CounterRef + CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names + AccountLabels []string + AccountFlags []string + AccountReports []ModReport + AccountTakedown bool } func (e *RepoEvent) GetCount(name, val, period string) int { @@ -43,6 +51,23 @@ func (e *RepoEvent) GetCount(name, val, period string) int { return v } +func (e *RepoEvent) Increment(name, val string) { + e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) +} + +func (e *RepoEvent) GetCountDistinct(name, bucket, period string) int { + v, err := e.Engine.GetCountDistinct(name, bucket, period) + if err != nil { + e.Err = err + return 0 + } + return v +} + +func (e *RepoEvent) IncrementDistinct(name, bucket, val string) { + e.CounterDistinctIncrements = append(e.CounterDistinctIncrements, CounterDistinctRef{Name: name, Bucket: bucket, Val: val}) +} + func (e *RepoEvent) InSet(name, val string) bool { v, err := e.Engine.InSet(name, val) if err != nil { @@ -52,10 +77,6 @@ func (e *RepoEvent) InSet(name, val string) bool { return v } -func (e *RepoEvent) Increment(name, val string) { - e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) -} - func (e *RepoEvent) TakedownAccount() { e.AccountTakedown = true } @@ -72,18 +93,95 @@ func (e *RepoEvent) ReportAccount(reason, comment string) { e.AccountReports = append(e.AccountReports, ModReport{ReasonType: reason, Comment: comment}) } +func slackBody(msg string, newLabels, newFlags []string, newReports []ModReport, newTakedown bool) string { + if len(newLabels) > 0 { + msg += fmt.Sprintf("New Labels: `%s`\n", strings.Join(newLabels, ", ")) + } + if len(newFlags) > 0 { + msg += fmt.Sprintf("New Flags: `%s`\n", strings.Join(newFlags, ", ")) + } + for _, rep := range newReports { + msg += fmt.Sprintf("Report `%s`: %s\n", rep.ReasonType, rep.Comment) + } + if newTakedown { + msg += fmt.Sprintf("Takedown!\n") + } + return msg +} + +// Persists account-level moderation actions: new labels, new flags, new takedowns, and reports. +// +// If necessary, will "purge" identity and account caches, so that state updates will be picked up for subsequent events. +// +// TODO: de-dupe reports based on existing state, similar to other state func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { + + // de-dupe actions + newLabels := []string{} + for _, val := range dedupeStrings(e.AccountLabels) { + exists := false + for _, e := range e.Account.AccountNegatedLabels { + if val == e { + exists = true + break + } + } + for _, e := range e.Account.AccountLabels { + if val == e { + exists = true + break + } + } + if !exists { + newLabels = append(newLabels, val) + } + } + newFlags := []string{} + for _, val := range dedupeStrings(e.AccountFlags) { + exists := false + for _, e := range e.Account.AccountFlags { + if val == e { + exists = true + break + } + } + if !exists { + newFlags = append(newFlags, val) + } + } + newReports := e.AccountReports + newTakedown := e.AccountTakedown && !e.Account.Takendown + + if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 { + if e.Engine.SlackWebhookURL != "" { + msg := fmt.Sprintf("⚠️ Automod Account Action ⚠️\n") + msg += fmt.Sprintf("`%s` / `%s` / / \n", + e.Account.Identity.DID, + e.Account.Identity.Handle, + e.Account.Identity.DID, + e.Account.Identity.DID, + ) + msg = slackBody(msg, newLabels, newFlags, newReports, newTakedown) + if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { + e.Logger.Error("sending slack webhook", "err", err) + } + } + } + if e.Engine.AdminClient == nil { return nil } + + needsPurge := false xrpcc := e.Engine.AdminClient - if len(e.AccountLabels) > 0 { + if len(newLabels) > 0 { comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, Event: &comatproto.AdminEmitModerationEvent_Input_Event{ AdminDefs_ModEventLabel: &comatproto.AdminDefs_ModEventLabel{ - CreateLabelVals: dedupeStrings(e.AccountLabels), + CreateLabelVals: newLabels, + NegateLabelVals: []string{}, Comment: &comment, }, }, @@ -96,9 +194,13 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { if err != nil { return err } + needsPurge = true + } + if len(newFlags) > 0 { + e.Engine.Flags.Add(ctx, e.Account.Identity.DID.String(), newFlags) + needsPurge = true } - // TODO: AccountFlags - for _, mr := range e.AccountReports { + for _, mr := range newReports { _, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ ReasonType: &mr.ReasonType, Reason: &mr.Comment, @@ -112,7 +214,7 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { return err } } - if e.AccountTakedown { + if newTakedown { comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, @@ -130,6 +232,10 @@ func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { if err != nil { return err } + needsPurge = true + } + if needsPurge { + return e.Engine.PurgeAccountCaches(ctx, e.Account.Identity.DID) } return nil } @@ -146,6 +252,12 @@ func (e *RepoEvent) PersistCounters(ctx context.Context) error { return err } } + for _, ref := range e.CounterDistinctIncrements { + err := e.Engine.Counters.IncrementDistinct(ctx, ref.Name, ref.Bucket, ref.Val) + if err != nil { + return err + } + } return nil } @@ -192,22 +304,50 @@ func (e *RecordEvent) ReportRecord(reason, comment string) { e.RecordReports = append(e.RecordReports, ModReport{ReasonType: reason, Comment: comment}) } +// Persists some record-level state: labels, takedowns, reports. +// +// NOTE: this method currently does *not* persist record-level flags to any storage, and does not de-dupe most actions, on the assumption that the record is new (from firehose) and has no existing mod state. func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { + + // TODO: consider de-duping record-level actions? at least for updates and deletes. + newLabels := dedupeStrings(e.RecordLabels) + newFlags := dedupeStrings(e.RecordFlags) + newReports := e.RecordReports + newTakedown := e.RecordTakedown + atURI := fmt.Sprintf("at://%s/%s/%s", e.Account.Identity.DID, e.Collection, e.RecordKey) + + if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 { + if e.Engine.SlackWebhookURL != "" { + msg := fmt.Sprintf("⚠️ Automod Record Action ⚠️\n") + msg += fmt.Sprintf("`%s` / `%s` / / \n", + e.Account.Identity.DID, + e.Account.Identity.Handle, + e.Account.Identity.DID, + e.Account.Identity.DID, + ) + msg += fmt.Sprintf("`%s`\n", atURI) + msg = slackBody(msg, newLabels, newFlags, newReports, newTakedown) + if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { + e.Logger.Error("sending slack webhook", "err", err) + } + } + } if e.Engine.AdminClient == nil { return nil } strongRef := comatproto.RepoStrongRef{ Cid: e.CID, - Uri: fmt.Sprintf("at://%s/%s/%s", e.Account.Identity.DID, e.Collection, e.RecordKey), + Uri: atURI, } xrpcc := e.Engine.AdminClient - if len(e.RecordLabels) > 0 { + if len(newLabels) > 0 { comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, Event: &comatproto.AdminEmitModerationEvent_Input_Event{ AdminDefs_ModEventLabel: &comatproto.AdminDefs_ModEventLabel{ - CreateLabelVals: dedupeStrings(e.RecordLabels), + CreateLabelVals: newLabels, + NegateLabelVals: []string{}, Comment: &comment, }, }, @@ -219,8 +359,10 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { return err } } - // TODO: AccountFlags - for _, mr := range e.RecordReports { + if len(newFlags) > 0 { + e.Engine.Flags.Add(ctx, atURI, newFlags) + } + for _, mr := range newReports { _, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ ReasonType: &mr.ReasonType, Reason: &mr.Comment, @@ -232,7 +374,7 @@ func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { return err } } - if e.RecordTakedown { + if newTakedown { comment := "automod" _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ CreatedBy: xrpcc.Auth.Did, @@ -272,7 +414,15 @@ func (e *RecordEvent) CanonicalLogLine() { ) } +type RecordDeleteEvent struct { + RepoEvent + + Collection string + RecordKey string +} + type IdentityRuleFunc = func(evt *IdentityEvent) error type RecordRuleFunc = func(evt *RecordEvent) error type PostRuleFunc = func(evt *RecordEvent, post *appbsky.FeedPost) error type ProfileRuleFunc = func(evt *RecordEvent, profile *appbsky.ActorProfile) error +type RecordDeleteRuleFunc = func(evt *RecordDeleteEvent) error diff --git a/automod/flagstore.go b/automod/flagstore.go new file mode 100644 index 000000000..b61402781 --- /dev/null +++ b/automod/flagstore.go @@ -0,0 +1,66 @@ +package automod + +import ( + "context" +) + +type FlagStore interface { + Get(ctx context.Context, key string) ([]string, error) + Add(ctx context.Context, key string, flags []string) error + Remove(ctx context.Context, key string, flags []string) error +} + +type MemFlagStore struct { + Data map[string][]string +} + +func NewMemFlagStore() MemFlagStore { + return MemFlagStore{ + Data: make(map[string][]string), + } +} + +func (s MemFlagStore) Get(ctx context.Context, key string) ([]string, error) { + v, ok := s.Data[key] + if !ok { + return []string{}, nil + } + return v, nil +} + +func (s MemFlagStore) Add(ctx context.Context, key string, flags []string) error { + v, ok := s.Data[key] + if !ok { + v = []string{} + } + for _, f := range flags { + v = append(v, f) + } + v = dedupeStrings(v) + s.Data[key] = v + return nil +} + +// does not error if flags not in set +func (s MemFlagStore) Remove(ctx context.Context, key string, flags []string) error { + if len(flags) == 0 { + return nil + } + v, ok := s.Data[key] + if !ok { + v = []string{} + } + m := make(map[string]bool, len(v)) + for _, f := range v { + m[f] = true + } + for _, f := range flags { + delete(m, f) + } + out := []string{} + for f, _ := range m { + out = append(out, f) + } + s.Data[key] = out + return nil +} diff --git a/automod/flagstore_test.go b/automod/flagstore_test.go new file mode 100644 index 000000000..a64ac67df --- /dev/null +++ b/automod/flagstore_test.go @@ -0,0 +1,30 @@ +package automod + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFlagStoreBasics(t *testing.T) { + assert := assert.New(t) + ctx := context.Background() + + fs := NewMemFlagStore() + + l, err := fs.Get(ctx, "test1") + assert.NoError(err) + assert.Empty(l) + + assert.NoError(fs.Add(ctx, "test1", []string{"red", "green"})) + assert.NoError(fs.Add(ctx, "test1", []string{"red", "blue"})) + l, err = fs.Get(ctx, "test1") + assert.NoError(err) + assert.Equal(3, len(l)) + + assert.NoError(fs.Remove(ctx, "test1", []string{"red", "blue"})) + l, err = fs.Get(ctx, "test1") + assert.NoError(err) + assert.Equal([]string{"green"}, l) +} diff --git a/automod/redis_cache.go b/automod/redis_cache.go index c5724826d..f057ff11f 100644 --- a/automod/redis_cache.go +++ b/automod/redis_cache.go @@ -53,11 +53,18 @@ func (s RedisCacheStore) Get(ctx context.Context, name, key string) (string, err } func (s RedisCacheStore) Set(ctx context.Context, name, key string, val string) error { - s.Data.Set(&cache.Item{ + return s.Data.Set(&cache.Item{ Ctx: ctx, Key: redisCacheKey(name, key), Value: val, TTL: s.TTL, }) - return nil +} + +func (s RedisCacheStore) Purge(ctx context.Context, name, key string) error { + err := s.Data.Delete(ctx, redisCacheKey(name, key)) + if err == cache.ErrCacheMiss { + return nil + } + return err } diff --git a/automod/redis_counters.go b/automod/redis_counters.go index f95fdbd8c..8076d0d5d 100644 --- a/automod/redis_counters.go +++ b/automod/redis_counters.go @@ -8,6 +8,7 @@ import ( ) var redisCountPrefix string = "count/" +var redisDistinctPrefix string = "distinct/" type RedisCountStore struct { Client *redis.Client @@ -63,3 +64,37 @@ func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error _, err := multi.Exec(ctx) return err } + +func (s *RedisCountStore) GetCountDistinct(ctx context.Context, name, val, period string) (int, error) { + key := redisDistinctPrefix + PeriodBucket(name, val, period) + c, err := s.Client.PFCount(ctx, key).Result() + if err == redis.Nil { + return 0, nil + } else if err != nil { + return 0, err + } + return int(c), nil +} + +func (s *RedisCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { + + var key string + + // increment multiple counters in a single redis round-trip + multi := s.Client.Pipeline() + + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodHour) + multi.PFAdd(ctx, key, val) + multi.Expire(ctx, key, 2*time.Hour) + + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodDay) + multi.PFAdd(ctx, key, val) + multi.Expire(ctx, key, 48*time.Hour) + + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodTotal) + multi.PFAdd(ctx, key, val) + // no expiration for total + + _, err := multi.Exec(ctx) + return err +} diff --git a/automod/redis_directory.go b/automod/redis_directory.go index 4a76c8b4d..064a5380b 100644 --- a/automod/redis_directory.go +++ b/automod/redis_directory.go @@ -86,6 +86,7 @@ func (d *RedisDirectory) IsIdentityStale(e *IdentityEntry) bool { } func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*HandleEntry, error) { + h = h.Normalize() ident, err := d.Inner.LookupHandle(ctx, h) if err != nil { he := HandleEntry{ @@ -314,6 +315,7 @@ func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*id func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { handle, err := a.AsHandle() if nil == err { // if not an error, is a handle + handle = handle.Normalize() return d.handleCache.Delete(ctx, handle.String()) } did, err := a.AsDID() diff --git a/automod/redis_flags.go b/automod/redis_flags.go new file mode 100644 index 000000000..83a37e68d --- /dev/null +++ b/automod/redis_flags.go @@ -0,0 +1,65 @@ +package automod + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +var redisFlagsPrefix string = "flags/" + +type RedisFlagStore struct { + Client *redis.Client +} + +func NewRedisFlagStore(redisURL string) (*RedisFlagStore, error) { + opt, err := redis.ParseURL(redisURL) + if err != nil { + return nil, err + } + rdb := redis.NewClient(opt) + // check redis connection + _, err = rdb.Ping(context.TODO()).Result() + if err != nil { + return nil, err + } + rcs := RedisFlagStore{ + Client: rdb, + } + return &rcs, nil +} + +func (s *RedisFlagStore) Get(ctx context.Context, key string) ([]string, error) { + rkey := redisFlagsPrefix + key + l, err := s.Client.SMembers(ctx, rkey).Result() + if err == redis.Nil { + return []string{}, nil + } else if err != nil { + return nil, err + } + return l, nil +} + +func (s *RedisFlagStore) Add(ctx context.Context, key string, flags []string) error { + if len(flags) == 0 { + return nil + } + l := []interface{}{} + for _, v := range flags { + l = append(l, v) + } + rkey := redisFlagsPrefix + key + return s.Client.SAdd(ctx, rkey, l...).Err() +} + +func (s *RedisFlagStore) Remove(ctx context.Context, key string, flags []string) error { + if len(flags) == 0 { + return nil + } + l := []interface{}{} + for _, v := range flags { + l = append(l, v) + } + rkey := redisFlagsPrefix + key + return s.Client.SRem(ctx, rkey, l...).Err() +} diff --git a/automod/redis_flagstore_test.go b/automod/redis_flagstore_test.go new file mode 100644 index 000000000..a295baa4f --- /dev/null +++ b/automod/redis_flagstore_test.go @@ -0,0 +1,35 @@ +package automod + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRedisFlagStoreBasics(t *testing.T) { + t.Skip("live test, need redis running locally") + assert := assert.New(t) + ctx := context.Background() + + fs, err := NewRedisFlagStore("redis://localhost:6379/0") + if err != nil { + t.Fail() + } + + l, err := fs.Get(ctx, "test1") + assert.NoError(err) + assert.Empty(l) + + assert.NoError(fs.Add(ctx, "test1", []string{"red", "green"})) + assert.NoError(fs.Add(ctx, "test1", []string{"red", "blue"})) + l, err = fs.Get(ctx, "test1") + assert.NoError(err) + assert.Equal(3, len(l)) + + assert.NoError(fs.Remove(ctx, "test1", []string{"red", "blue", "orange"})) + l, err = fs.Get(ctx, "test1") + assert.NoError(err) + assert.Equal([]string{"green"}, l) + assert.NoError(fs.Remove(ctx, "test1", []string{"green"})) +} diff --git a/automod/rules/all.go b/automod/rules/all.go index 86f4b6bdf..530ea4886 100644 --- a/automod/rules/all.go +++ b/automod/rules/all.go @@ -10,9 +10,27 @@ func DefaultRules() automod.RuleSet { MisleadingURLPostRule, MisleadingMentionPostRule, ReplyCountPostRule, - BanHashtagsPostRule, - AccountDemoPostRule, + BadHashtagsPostRule, + //TooManyHashtagsPostRule, + //AccountDemoPostRule, AccountPrivateDemoPostRule, + GtubePostRule, + KeywordPostRule, + ReplySingleKeywordPostRule, + AggressivePromotionRule, + }, + ProfileRules: []automod.ProfileRuleFunc{ + GtubeProfileRule, + KeywordProfileRule, + }, + RecordRules: []automod.RecordRuleFunc{ + InteractionChurnRule, + }, + RecordDeleteRules: []automod.RecordDeleteRuleFunc{ + DeleteInteractionRule, + }, + IdentityRules: []automod.IdentityRuleFunc{ + NewAccountRule, }, } return rules diff --git a/automod/rules/example_sets.json b/automod/rules/example_sets.json index d66de7831..4d2367f92 100644 --- a/automod/rules/example_sets.json +++ b/automod/rules/example_sets.json @@ -1,6 +1,12 @@ { - "banned-hashtags": [ + "bad-hashtags": [ "slur", - "anotherslur" + "deathtooutgroup" + ], + "bad-words": [ + "hardar" + ], + "promo-domain": [ + "buy-crypto.example.com" ] } diff --git a/automod/rules/fixture_test.go b/automod/rules/fixture_test.go index 6328c4702..d092c8406 100644 --- a/automod/rules/fixture_test.go +++ b/automod/rules/fixture_test.go @@ -2,6 +2,7 @@ package rules import ( "log/slog" + "time" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" @@ -12,12 +13,14 @@ import ( func engineFixture() automod.Engine { rules := automod.RuleSet{ PostRules: []automod.PostRuleFunc{ - BanHashtagsPostRule, + BadHashtagsPostRule, }, } + flags := automod.NewMemFlagStore() + cache := automod.NewMemCacheStore(10, time.Hour) sets := automod.NewMemSetStore() - sets.Sets["banned-hashtags"] = make(map[string]bool) - sets.Sets["banned-hashtags"]["slur"] = true + sets.Sets["bad-hashtags"] = make(map[string]bool) + sets.Sets["bad-hashtags"]["slur"] = true dir := identity.NewMockDirectory() id1 := identity.Identity{ DID: syntax.DID("did:plc:abc111"), @@ -37,6 +40,8 @@ func engineFixture() automod.Engine { Directory: &dir, Counters: automod.NewMemCountStore(), Sets: sets, + Flags: flags, + Cache: cache, Rules: rules, AdminClient: &adminc, } diff --git a/automod/rules/gtube.go b/automod/rules/gtube.go new file mode 100644 index 000000000..a281e4c1e --- /dev/null +++ b/automod/rules/gtube.go @@ -0,0 +1,25 @@ +package rules + +import ( + "strings" + + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/automod" +) + +// https://en.wikipedia.org/wiki/GTUBE +var gtubeString = "XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X" + +func GtubePostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + if strings.Contains(post.Text, gtubeString) { + evt.AddRecordLabel("spam") + } + return nil +} + +func GtubeProfileRule(evt *automod.RecordEvent, profile *appbsky.ActorProfile) error { + if profile.Description != nil && strings.Contains(*profile.Description, gtubeString) { + evt.AddRecordLabel("spam") + } + return nil +} diff --git a/automod/rules/hashtags.go b/automod/rules/hashtags.go index aa047a207..a9e934483 100644 --- a/automod/rules/hashtags.go +++ b/automod/rules/hashtags.go @@ -5,12 +5,31 @@ import ( "github.com/bluesky-social/indigo/automod" ) -func BanHashtagsPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { +// looks for specific hashtags from known lists +func BadHashtagsPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { for _, tag := range ExtractHashtags(post) { - if evt.InSet("banned-hashtags", tag) { + tag = NormalizeHashtag(tag) + if evt.InSet("bad-hashtags", tag) { evt.AddRecordFlag("bad-hashtag") break } } return nil } + +// if a post is "almost all" hashtags, it might be a form of search spam +func TooManyHashtagsPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + tags := ExtractHashtags(post) + tagChars := 0 + for _, tag := range tags { + tagChars += len(tag) + } + tagTextRatio := float64(tagChars) / float64(len(post.Text)) + // if there is an image, allow some more tags + if len(tags) > 4 && tagTextRatio > 0.6 && post.Embed.EmbedImages == nil { + evt.AddRecordFlag("many-hashtags") + } else if len(tags) > 7 && tagTextRatio > 0.8 { + evt.AddRecordFlag("many-hashtags") + } + return nil +} diff --git a/automod/rules/hashtags_test.go b/automod/rules/hashtags_test.go index 678aa0299..ace69485d 100644 --- a/automod/rules/hashtags_test.go +++ b/automod/rules/hashtags_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBanHashtagPostRule(t *testing.T) { +func TestBadHashtagPostRule(t *testing.T) { assert := assert.New(t) engine := engineFixture() @@ -27,7 +27,7 @@ func TestBanHashtagPostRule(t *testing.T) { Text: "some post blah", } evt1 := engine.NewRecordEvent(am1, path, cid1, &p1) - assert.NoError(BanHashtagsPostRule(&evt1, &p1)) + assert.NoError(BadHashtagsPostRule(&evt1, &p1)) assert.Empty(evt1.RecordFlags) p2 := appbsky.FeedPost{ @@ -35,6 +35,6 @@ func TestBanHashtagPostRule(t *testing.T) { Tags: []string{"one", "slur"}, } evt2 := engine.NewRecordEvent(am1, path, cid1, &p2) - assert.NoError(BanHashtagsPostRule(&evt2, &p2)) + assert.NoError(BadHashtagsPostRule(&evt2, &p2)) assert.NotEmpty(evt2.RecordFlags) } diff --git a/automod/rules/helpers.go b/automod/rules/helpers.go index d702ef059..5037fb380 100644 --- a/automod/rules/helpers.go +++ b/automod/rules/helpers.go @@ -2,8 +2,13 @@ package rules import ( "fmt" + "regexp" + "strings" + "unicode" appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/bluesky-social/indigo/automod" ) func dedupeStrings(in []string) []string { @@ -33,6 +38,10 @@ func ExtractHashtags(post *appbsky.FeedPost) []string { return dedupeStrings(tags) } +func NormalizeHashtag(raw string) string { + return strings.ToLower(raw) +} + type PostFacet struct { Text string URL *string @@ -76,3 +85,95 @@ func ExtractFacets(post *appbsky.FeedPost) ([]PostFacet, error) { } return out, nil } + +func ExtractPostBlobCIDsPost(post *appbsky.FeedPost) []string { + var out []string + if post.Embed.EmbedImages != nil { + for _, img := range post.Embed.EmbedImages.Images { + out = append(out, img.Image.Ref.String()) + } + } + if post.Embed.EmbedRecordWithMedia != nil { + media := post.Embed.EmbedRecordWithMedia.Media + if media.EmbedImages != nil { + for _, img := range media.EmbedImages.Images { + out = append(out, img.Image.Ref.String()) + } + } + } + return dedupeStrings(out) +} + +func ExtractBlobCIDsProfile(profile *appbsky.ActorProfile) []string { + var out []string + if profile.Avatar != nil { + out = append(out, profile.Avatar.Ref.String()) + } + if profile.Banner != nil { + out = append(out, profile.Banner.Ref.String()) + } + return dedupeStrings(out) +} + +// NOTE: this function has not been optimiszed at all! +func ExtractTextTokens(raw string) []string { + raw = strings.ToLower(raw) + f := func(c rune) bool { + return !unicode.IsLetter(c) && !unicode.IsNumber(c) + } + return strings.FieldsFunc(raw, f) +} + +func ExtractTextTokensPost(post *appbsky.FeedPost) []string { + return ExtractTextTokens(post.Text) +} + +func ExtractTextTokensProfile(profile *appbsky.ActorProfile) []string { + s := "" + if profile.Description != nil { + s += " " + *profile.Description + } + if profile.DisplayName != nil { + s += " " + *profile.DisplayName + } + return ExtractTextTokens(s) +} + +// based on: https://stackoverflow.com/a/48769624, with no trailing period allowed +var urlRegex = regexp.MustCompile(`(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]*[\w/\-&?=%]+`) + +func ExtractTextURLs(raw string) []string { + return urlRegex.FindAllString(raw, -1) +} + +func ExtractTextURLsProfile(profile *appbsky.ActorProfile) []string { + s := "" + if profile.Description != nil { + s += " " + *profile.Description + } + if profile.DisplayName != nil { + s += " " + *profile.DisplayName + } + return ExtractTextURLs(s) +} + +// checks if the post event is a reply post for which the author is replying to themselves, or author is the root author (OP) +func IsSelfThread(evt *automod.RecordEvent, post *appbsky.FeedPost) bool { + if post.Reply == nil { + return false + } + did := evt.Account.Identity.DID.String() + parentURI, err := syntax.ParseATURI(post.Reply.Parent.Uri) + if err != nil { + return false + } + rootURI, err := syntax.ParseATURI(post.Reply.Root.Uri) + if err != nil { + return false + } + + if parentURI.Authority().String() == did || rootURI.Authority().String() == did { + return true + } + return false +} diff --git a/automod/rules/helpers_test.go b/automod/rules/helpers_test.go new file mode 100644 index 000000000..050b572a0 --- /dev/null +++ b/automod/rules/helpers_test.go @@ -0,0 +1,55 @@ +package rules + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTokenizeText(t *testing.T) { + assert := assert.New(t) + + fixtures := []struct { + s string + out []string + }{ + { + s: "1 'Two' three!", + out: []string{"1", "two", "three"}, + }, + { + s: " foo1;bar2,baz3...", + out: []string{"foo1", "bar2", "baz3"}, + }, + { + s: "https://example.com/index.html", + out: []string{"https", "example", "com", "index", "html"}, + }, + } + + for _, fix := range fixtures { + assert.Equal(fix.out, ExtractTextTokens(fix.s)) + } +} + +func TestExtractURL(t *testing.T) { + assert := assert.New(t) + + fixtures := []struct { + s string + out []string + }{ + { + s: "this is a description with example.com mentioned in the middle", + out: []string{"example.com"}, + }, + { + s: "this is another example with https://en.wikipedia.org/index.html: and archive.org, and https://eff.org/... and bsky.app.", + out: []string{"https://en.wikipedia.org/index.html", "archive.org", "https://eff.org/", "bsky.app"}, + }, + } + + for _, fix := range fixtures { + assert.Equal(fix.out, ExtractTextURLs(fix.s)) + } +} diff --git a/automod/rules/identity.go b/automod/rules/identity.go new file mode 100644 index 000000000..00c1cadac --- /dev/null +++ b/automod/rules/identity.go @@ -0,0 +1,45 @@ +package rules + +import ( + "net/url" + "strings" + "time" + + "github.com/bluesky-social/indigo/automod" +) + +// triggers on first identity event for an account (DID) +func NewAccountRule(evt *automod.IdentityEvent) error { + // need access to IndexedAt for this rule + if evt.Account.Private == nil || evt.Account.Identity == nil { + return nil + } + + did := evt.Account.Identity.DID.String() + age := time.Since(evt.Account.Private.IndexedAt) + if age > 2*time.Hour { + return nil + } + exists := evt.GetCount("acct/exists", did, automod.PeriodTotal) + if exists == 0 { + evt.Logger.Info("new account") + evt.Increment("acct/exists", did) + + pdsURL, err := url.Parse(evt.Account.Identity.PDSEndpoint()) + if err != nil { + evt.Logger.Warn("invalid PDS URL", "err", err, "endpoint", evt.Account.Identity.PDSEndpoint()) + return nil + } + pdsHost := strings.ToLower(pdsURL.Host) + existingAccounts := evt.GetCount("host/newacct", pdsHost, automod.PeriodTotal) + evt.Increment("host/newacct", pdsHost) + + // new PDS host + if existingAccounts == 0 { + evt.Logger.Info("new PDS instance", "host", pdsHost) + evt.Increment("host", "new") + evt.AddAccountFlag("host-first-account") + } + } + return nil +} diff --git a/automod/rules/interaction.go b/automod/rules/interaction.go new file mode 100644 index 000000000..fc4cadc42 --- /dev/null +++ b/automod/rules/interaction.go @@ -0,0 +1,44 @@ +package rules + +import ( + "github.com/bluesky-social/indigo/automod" +) + +var interactionDailyThreshold = 500 + +// looks for accounts which do frequent interaction churn, such as follow-unfollow. +func InteractionChurnRule(evt *automod.RecordEvent) error { + did := evt.Account.Identity.DID.String() + switch evt.Collection { + case "app.bsky.feed.like": + evt.Increment("like", did) + created := evt.GetCount("like", did, automod.PeriodDay) + deleted := evt.GetCount("unlike", did, automod.PeriodDay) + ratio := float64(deleted) / float64(created) + if created > interactionDailyThreshold && deleted > interactionDailyThreshold && ratio > 0.5 { + evt.Logger.Info("high-like-churn", "created-today", created, "deleted-today", deleted) + evt.AddAccountFlag("high-like-churn") + } + case "app.bsky.graph.follow": + evt.Increment("follow", did) + created := evt.GetCount("follow", did, automod.PeriodDay) + deleted := evt.GetCount("unfollow", did, automod.PeriodDay) + ratio := float64(deleted) / float64(created) + if created > interactionDailyThreshold && deleted > interactionDailyThreshold && ratio > 0.5 { + evt.Logger.Info("high-follow-churn", "created-today", created, "deleted-today", deleted) + evt.AddAccountFlag("high-follow-churn") + } + } + return nil +} + +func DeleteInteractionRule(evt *automod.RecordDeleteEvent) error { + did := evt.Account.Identity.DID.String() + switch evt.Collection { + case "app.bsky.feed.like": + evt.Increment("unlike", did) + case "app.bsky.graph.follow": + evt.Increment("unfollow", did) + } + return nil +} diff --git a/automod/rules/keyword.go b/automod/rules/keyword.go new file mode 100644 index 000000000..ff2c644b4 --- /dev/null +++ b/automod/rules/keyword.go @@ -0,0 +1,36 @@ +package rules + +import ( + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/automod" +) + +func KeywordPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + for _, tok := range ExtractTextTokensPost(post) { + if evt.InSet("bad-words", tok) { + evt.AddRecordFlag("bad-word") + break + } + } + return nil +} + +func KeywordProfileRule(evt *automod.RecordEvent, profile *appbsky.ActorProfile) error { + for _, tok := range ExtractTextTokensProfile(profile) { + if evt.InSet("bad-words", tok) { + evt.AddRecordFlag("bad-word") + break + } + } + return nil +} + +func ReplySingleKeywordPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + if post.Reply != nil && !IsSelfThread(evt, post) { + tokens := ExtractTextTokensPost(post) + if len(tokens) == 1 && evt.InSet("bad-words", tokens[0]) { + evt.AddRecordFlag("reply-single-bad-word") + } + } + return nil +} diff --git a/automod/rules/misleading.go b/automod/rules/misleading.go index b8668ead2..04e3c9827 100644 --- a/automod/rules/misleading.go +++ b/automod/rules/misleading.go @@ -2,52 +2,98 @@ package rules import ( "context" + "log/slog" "net/url" "strings" + "unicode" appbsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod" ) +func isMisleadingURLFacet(facet PostFacet, logger *slog.Logger) bool { + linkURL, err := url.Parse(*facet.URL) + if err != nil { + logger.Warn("invalid link metadata URL", "url", facet.URL) + return false + } + + // basic text string pre-cleanups + text := strings.ToLower(strings.TrimSpace(facet.Text)) + + // remove square brackets + if strings.HasPrefix(text, "[") && strings.HasSuffix(text, "]") { + text = text[1 : len(text)-1] + } + + // truncated and not an obvious prefix hack (TODO: more special domains? regex?) + if strings.HasSuffix(text, "...") && !strings.HasSuffix(text, ".com...") && !strings.HasSuffix(text, ".org...") { + return false + } + if strings.HasSuffix(text, "…") && !strings.HasSuffix(text, ".com…") && !strings.HasSuffix(text, ".org…") { + return false + } + + // remove any other truncation suffix + text = strings.TrimSuffix(strings.TrimSuffix(text, "..."), "…") + + if len(text) == 0 { + logger.Warn("empty facet text", "text", facet.Text) + return false + } + + // if really not-a-domain, just skip + if !strings.Contains(text, ".") { + return false + } + + // hostnames can't start with a digit (eg, arxiv or DOI links) + for _, c := range text[0:1] { + if unicode.IsNumber(c) { + return false + } + } + + // try to fix any missing method in the text + if !strings.Contains(text, "://") { + text = "https://" + text + } + + // try parsing as a full URL (with whitespace trimmed) + textURL, err := url.Parse(text) + if err != nil { + logger.Warn("invalid link text URL", "url", facet.Text) + return false + } + + // for now just compare domains to handle the most obvious cases + // this public code will obviously get discovered and bypassed. this doesn't earn you any security cred! + linkHost := strings.TrimPrefix(strings.ToLower(linkURL.Host), "www.") + textHost := strings.TrimPrefix(strings.ToLower(textURL.Host), "www.") + if textHost != linkHost { + logger.Warn("misleading mismatched domains", "linkHost", linkURL.Host, "textHost", textURL.Host, "text", facet.Text) + return true + } + return false +} + func MisleadingURLPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + // TODO: make this an InSet() config? + if evt.Account.Identity.Handle == "nowbreezing.ntw.app" { + return nil + } facets, err := ExtractFacets(post) if err != nil { evt.Logger.Warn("invalid facets", "err", err) - evt.AddRecordFlag("invalid") // TODO: or some other "this record is corrupt" indicator? + // TODO: or some other "this record is corrupt" indicator? + //evt.AddRecordFlag("broken-post") return nil } for _, facet := range facets { if facet.URL != nil { - linkURL, err := url.Parse(*facet.URL) - if err != nil { - evt.Logger.Warn("invalid link metadata URL", "url", facet.URL) - continue - } - - // basic text string pre-cleanups - text := strings.ToLower(strings.TrimSuffix(strings.TrimSpace(facet.Text), "...")) - // if really not a domain, just skipp - if !strings.Contains(text, ".") { - continue - } - // try to fix any missing method in the text - if !strings.Contains(text, "://") { - text = "https://" + text - } - - // try parsing as a full URL (with whitespace trimmed) - textURL, err := url.Parse(text) - if err != nil { - evt.Logger.Warn("invalid link text URL", "url", facet.Text) - continue - } - - // for now just compare domains to handle the most obvious cases - // this public code will obviously get discovered and bypassed. this doesn't earn you any security cred! - if linkURL.Host != textURL.Host && linkURL.Host != "www."+linkURL.Host { - evt.Logger.Warn("misleading mismatched domains", "linkHost", linkURL.Host, "textHost", textURL.Host, "text", facet.Text) - evt.AddRecordFlag("misleading") + if isMisleadingURLFacet(facet, evt.Logger) { + evt.AddRecordFlag("misleading-link") } } } @@ -60,7 +106,8 @@ func MisleadingMentionPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) facets, err := ExtractFacets(post) if err != nil { evt.Logger.Warn("invalid facets", "err", err) - evt.AddRecordFlag("invalid") // TODO: or some other "this record is corrupt" indicator? + // TODO: or some other "this record is corrupt" indicator? + //evt.AddRecordFlag("broken-post") return nil } for _, facet := range facets { @@ -69,7 +116,7 @@ func MisleadingMentionPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) if txt[0] == '@' { txt = txt[1:] } - handle, err := syntax.ParseHandle(txt) + handle, err := syntax.ParseHandle(strings.ToLower(txt)) if err != nil { evt.Logger.Warn("mention was not a valid handle", "text", txt) continue @@ -78,14 +125,14 @@ func MisleadingMentionPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) mentioned, err := evt.Engine.Directory.LookupHandle(ctx, handle) if err != nil { evt.Logger.Warn("could not resolve handle", "handle", handle) - evt.AddRecordFlag("misleading") + evt.AddRecordFlag("broken-mention") break } // TODO: check if mentioned DID was recently updated? might be a caching issue if mentioned.DID.String() != *facet.DID { evt.Logger.Warn("misleading mention", "text", txt, "did", facet.DID) - evt.AddRecordFlag("misleading") + evt.AddRecordFlag("misleading-mention") continue } } diff --git a/automod/rules/misleading_test.go b/automod/rules/misleading_test.go index fee4444ec..16ccc42b1 100644 --- a/automod/rules/misleading_test.go +++ b/automod/rules/misleading_test.go @@ -1,6 +1,7 @@ package rules import ( + "log/slog" "testing" appbsky "github.com/bluesky-social/indigo/api/bsky" @@ -80,3 +81,85 @@ func TestMisleadingMentionPostRule(t *testing.T) { assert.NoError(MisleadingMentionPostRule(&evt1, &p1)) assert.NotEmpty(evt1.RecordFlags) } + +func pstr(raw string) *string { + return &raw +} + +func TestIsMisleadingURL(t *testing.T) { + assert := assert.New(t) + logger := slog.Default() + + fixtures := []struct { + facet PostFacet + out bool + }{ + { + facet: PostFacet{ + Text: "https://atproto.com", + URL: pstr("https://atproto.com"), + }, + out: false, + }, + { + facet: PostFacet{ + Text: "https://atproto.com", + URL: pstr("https://evil.com"), + }, + out: true, + }, + { + facet: PostFacet{ + Text: "https://www.atproto.com", + URL: pstr("https://atproto.com"), + }, + out: false, + }, + { + facet: PostFacet{ + Text: "https://atproto.com", + URL: pstr("https://www.atproto.com"), + }, + out: false, + }, + { + facet: PostFacet{ + Text: "[example.com]", + URL: pstr("https://www.example.com"), + }, + out: false, + }, + { + facet: PostFacet{ + Text: "example.com...", + URL: pstr("https://example.com.evil.com"), + }, + out: true, + }, + { + facet: PostFacet{ + Text: "ATPROTO.com...", + URL: pstr("https://atproto.com"), + }, + out: false, + }, + { + facet: PostFacet{ + Text: "1234.5678", + URL: pstr("https://arxiv.org/abs/1234.5678"), + }, + out: false, + }, + { + facet: PostFacet{ + Text: "www.techdirt.com…", + URL: pstr("https://www.techdirt.com/"), + }, + out: false, + }, + } + + for _, fix := range fixtures { + assert.Equal(fix.out, isMisleadingURLFacet(fix.facet, logger)) + } +} diff --git a/automod/rules/promo.go b/automod/rules/promo.go new file mode 100644 index 000000000..17f1c6a14 --- /dev/null +++ b/automod/rules/promo.go @@ -0,0 +1,60 @@ +package rules + +import ( + "net/url" + "strings" + "time" + + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/automod" +) + +// looks for new accounts, with a commercial or donation link in profile, which directly reply to several accounts +// +// this rule depends on ReplyCountPostRule() to set counts +func AggressivePromotionRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { + if evt.Account.Private == nil || evt.Account.Identity == nil { + return nil + } + // TODO: helper for account age + age := time.Since(evt.Account.Private.IndexedAt) + if age > 7*24*time.Hour { + return nil + } + if post.Reply == nil || IsSelfThread(evt, post) { + return nil + } + + allURLs := ExtractTextURLs(post.Text) + if evt.Account.Profile.Description != nil { + profileURLs := ExtractTextURLs(*evt.Account.Profile.Description) + allURLs = append(allURLs, profileURLs...) + } + hasPromo := false + for _, s := range allURLs { + if !strings.Contains(s, "://") { + s = "https://" + s + } + u, err := url.Parse(s) + if err != nil { + evt.Logger.Warn("failed to parse URL", "url", s) + continue + } + host := strings.TrimPrefix(strings.ToLower(u.Host), "www.") + if evt.InSet("promo-domain", host) { + hasPromo = true + break + } + } + if !hasPromo { + return nil + } + + did := evt.Account.Identity.DID.String() + uniqueReplies := evt.GetCountDistinct("reply-to", did, automod.PeriodDay) + if uniqueReplies >= 5 { + evt.AddAccountFlag("promo-multi-reply") + } + + return nil +} diff --git a/automod/rules/replies.go b/automod/rules/replies.go index db086cba2..ef37bc31f 100644 --- a/automod/rules/replies.go +++ b/automod/rules/replies.go @@ -2,16 +2,28 @@ package rules import ( appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod" ) +// does not count "self-replies" (direct to self, or in own post thread) func ReplyCountPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { - if post.Reply != nil { - did := evt.Account.Identity.DID.String() - if evt.GetCount("reply", did, automod.PeriodDay) > 3 { - evt.AddAccountFlag("frequent-replier") - } - evt.Increment("reply", did) + if post.Reply == nil || IsSelfThread(evt, post) { + return nil } + + did := evt.Account.Identity.DID.String() + if evt.GetCount("reply", did, automod.PeriodDay) > 3 { + // TODO: disabled, too noisy for prod + //evt.AddAccountFlag("frequent-replier") + } + evt.Increment("reply", did) + + parentURI, err := syntax.ParseATURI(post.Reply.Parent.Uri) + if err != nil { + evt.Logger.Warn("failed to parse reply AT-URI", "uri", post.Reply.Parent.Uri) + return nil + } + evt.IncrementDistinct("reply-to", did, parentURI.Authority().String()) return nil } diff --git a/automod/ruleset.go b/automod/ruleset.go index 12d0794c4..c91a0430c 100644 --- a/automod/ruleset.go +++ b/automod/ruleset.go @@ -7,10 +7,11 @@ import ( ) type RuleSet struct { - PostRules []PostRuleFunc - ProfileRules []ProfileRuleFunc - RecordRules []RecordRuleFunc - IdentityRules []IdentityRuleFunc + PostRules []PostRuleFunc + ProfileRules []ProfileRuleFunc + RecordRules []RecordRuleFunc + RecordDeleteRules []RecordDeleteRuleFunc + IdentityRules []IdentityRuleFunc } func (r *RuleSet) CallRecordRules(evt *RecordEvent) error { @@ -58,6 +59,19 @@ func (r *RuleSet) CallRecordRules(evt *RecordEvent) error { return nil } +func (r *RuleSet) CallRecordDeleteRules(evt *RecordDeleteEvent) error { + for _, f := range r.RecordDeleteRules { + err := f(evt) + if err != nil { + return err + } + if evt.Err != nil { + return evt.Err + } + } + return nil +} + func (r *RuleSet) CallIdentityRules(evt *IdentityEvent) error { for _, f := range r.IdentityRules { err := f(evt) diff --git a/automod/slack.go b/automod/slack.go new file mode 100644 index 000000000..6191452fc --- /dev/null +++ b/automod/slack.go @@ -0,0 +1,42 @@ +package automod + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +type SlackWebhookBody struct { + Text string `json:"text"` +} + +// Sends a simple slack message to a channel via "incoming webhook". +// +// The slack incoming webhook must be already configured in the slack workplace. +func (e *Engine) SendSlackMsg(ctx context.Context, msg string) error { + // loosely based on: https://golangcode.com/send-slack-messages-without-a-library/ + + body, _ := json.Marshal(SlackWebhookBody{Text: msg}) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.SlackWebhookURL, bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + client := http.DefaultClient + resp, err := client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + buf := new(bytes.Buffer) + buf.ReadFrom(resp.Body) + if resp.StatusCode != 200 || buf.String() != "ok" { + // TODO: in some cases print body? eg, if short and text + return fmt.Errorf("failed slack webhook POST request. status=%d", resp.StatusCode) + } + return nil +} diff --git a/cmd/hepa/consumer.go b/cmd/hepa/consumer.go index b9b1434f4..e62c29ad4 100644 --- a/cmd/hepa/consumer.go +++ b/cmd/hepa/consumer.go @@ -61,6 +61,18 @@ func (s *Server) RunConsumer(ctx context.Context) error { } return nil }, + RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { + s.lastSeq = evt.Seq + did, err := syntax.ParseDID(evt.Did) + if err != nil { + s.logger.Error("bad DID in RepoMigrate event", "did", evt.Did, "seq", evt.Seq, "err", err) + return nil + } + if err := s.engine.ProcessIdentityEvent(ctx, "migrate", did); err != nil { + s.logger.Error("processing repo migrate failed", "did", evt.Did, "seq", evt.Seq, "err", err) + } + return nil + }, // TODO: other event callbacks as needed } @@ -121,6 +133,12 @@ func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubsc logger.Error("engine failed to process record", "err", err) continue } + case repomgr.EvtKindDeleteRecord: + err = s.engine.ProcessRecordDelete(ctx, did, op.Path) + if err != nil { + logger.Error("engine failed to process record", "err", err) + continue + } default: // TODO: other event types: update, delete } diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index c652c432b..5c3d112aa 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -9,6 +9,7 @@ import ( "time" "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/automod" "github.com/carlmjohnson/versioninfo" @@ -95,6 +96,7 @@ func run(args []string) error { app.Commands = []*cli.Command{ runCmd, processRecordCmd, + processRecentCmd, } return app.Run(args) @@ -134,6 +136,12 @@ var runCmd = &cli.Command{ Value: ":3989", EnvVars: []string{"HEPA_METRICS_LISTEN"}, }, + &cli.StringFlag{ + Name: "slack-webhook-url", + // eg: https://hooks.slack.com/services/X1234 + Usage: "full URL of slack webhook", + EnvVars: []string{"SLACK_WEBHOOK_URL"}, + }, }, Action: func(cctx *cli.Context) error { ctx := context.Background() @@ -152,15 +160,16 @@ var runCmd = &cli.Command{ srv, err := NewServer( dir, Config{ - BGSHost: cctx.String("atp-bgs-host"), - BskyHost: cctx.String("atp-bsky-host"), - Logger: logger, - ModHost: cctx.String("atp-mod-host"), - ModAdminToken: cctx.String("mod-admin-token"), - ModUsername: cctx.String("mod-handle"), - ModPassword: cctx.String("mod-password"), - SetsFileJSON: cctx.String("sets-json-path"), - RedisURL: cctx.String("redis-url"), + BGSHost: cctx.String("atp-bgs-host"), + BskyHost: cctx.String("atp-bsky-host"), + Logger: logger, + ModHost: cctx.String("atp-mod-host"), + ModAdminToken: cctx.String("mod-admin-token"), + ModUsername: cctx.String("mod-handle"), + ModPassword: cctx.String("mod-password"), + SetsFileJSON: cctx.String("sets-json-path"), + RedisURL: cctx.String("redis-url"), + SlackWebhookURL: cctx.String("slack-webhook-url"), }, ) if err != nil { @@ -195,10 +204,68 @@ var processRecordCmd = &cli.Command{ ArgsUsage: ``, Flags: []cli.Flag{}, Action: func(cctx *cli.Context) error { - uri := cctx.Args().First() - if uri == "" { + uriArg := cctx.Args().First() + if uriArg == "" { return fmt.Errorf("expected a single AT-URI argument") } + aturi, err := syntax.ParseATURI(uriArg) + if err != nil { + return fmt.Errorf("not a valid AT-URI: %v", err) + } + + ctx := context.Background() + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + slog.SetDefault(logger) + + dir, err := configDirectory(cctx) + if err != nil { + return err + } + + srv, err := NewServer( + dir, + Config{ + BGSHost: cctx.String("atp-bgs-host"), + BskyHost: cctx.String("atp-bsky-host"), + Logger: logger, + ModHost: cctx.String("atp-mod-host"), + ModAdminToken: cctx.String("mod-admin-token"), + ModUsername: cctx.String("mod-handle"), + ModPassword: cctx.String("mod-password"), + SetsFileJSON: cctx.String("sets-json-path"), + RedisURL: cctx.String("redis-url"), + }, + ) + if err != nil { + return err + } + + return srv.engine.FetchAndProcessRecord(ctx, aturi) + }, +} + +var processRecentCmd = &cli.Command{ + Name: "process-recent", + Usage: "fetch and process recent posts for an account", + ArgsUsage: ``, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "limit", + Usage: "how many post records to parse", + Value: 20, + }, + }, + Action: func(cctx *cli.Context) error { + idArg := cctx.Args().First() + if idArg == "" { + return fmt.Errorf("expected a single AT identifier (handle or DID) argument") + } + atid, err := syntax.ParseAtIdentifier(idArg) + if err != nil { + return fmt.Errorf("not a valid handle or DID: %v", err) + } ctx := context.Background() logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ @@ -229,6 +296,6 @@ var processRecordCmd = &cli.Command{ return err } - return srv.engine.FetchAndProcessRecord(ctx, uri) + return srv.engine.FetchAndProcessRecent(ctx, *atid, cctx.Int("limit")) }, } diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index a09e37c7d..560b85fe7 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -29,15 +29,16 @@ type Server struct { } type Config struct { - BGSHost string - BskyHost string - ModHost string - ModAdminToken string - ModUsername string - ModPassword string - SetsFileJSON string - RedisURL string - Logger *slog.Logger + BGSHost string + BskyHost string + ModHost string + ModAdminToken string + ModUsername string + ModPassword string + SetsFileJSON string + RedisURL string + SlackWebhookURL string + Logger *slog.Logger } func NewServer(dir identity.Directory, config Config) (*Server, error) { @@ -87,6 +88,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { var counters automod.CountStore var cache automod.CacheStore + var flags automod.FlagStore var rdb *redis.Client if config.RedisURL != "" { // generic client, for cursor state @@ -112,9 +114,16 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { return nil, err } cache = csh + + flg, err := automod.NewRedisFlagStore(config.RedisURL) + if err != nil { + return nil, err + } + flags = flg } else { counters = automod.NewMemCountStore() cache = automod.NewMemCacheStore(5_000, 30*time.Minute) + flags = automod.NewMemFlagStore() } engine := automod.Engine{ @@ -122,6 +131,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { Directory: dir, Counters: counters, Sets: sets, + Flags: flags, Cache: cache, Rules: rules.DefaultRules(), AdminClient: xrpcc, @@ -129,6 +139,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { Client: util.RobustHTTPClient(), Host: config.BskyHost, }, + SlackWebhookURL: config.SlackWebhookURL, } s := &Server{ diff --git a/go.sum b/go.sum index 72f8facb9..195978f00 100644 --- a/go.sum +++ b/go.sum @@ -80,7 +80,6 @@ github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPv github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -301,8 +300,6 @@ github.com/ipfs/go-ipfs-routing v0.3.0 h1:9W/W3N+g+y4ZDeffSgqhgo7BsBSJwPMcyssET9 github.com/ipfs/go-ipfs-routing v0.3.0/go.mod h1:dKqtTFIql7e1zYsEuWLyuOU+E0WJWW8JjbTPLParDWo= github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= github.com/ipfs/go-ipfs-util v0.0.3/go.mod h1:LHzG1a0Ig4G+iZ26UUOMjHd+lfM84LZCrn17xAKWBvs= -github.com/ipfs/go-ipld-cbor v0.0.7-0.20230126201833-a73d038d90bc h1:eUEo764smNy0EVRuMTSmirmuh552Mf2aBjfpDcLnDa8= -github.com/ipfs/go-ipld-cbor v0.0.7-0.20230126201833-a73d038d90bc/go.mod h1:X7SgEIwC4COC5OWfcepZBWafO5kA1Rmt9ZsLLbhihQk= github.com/ipfs/go-ipld-cbor v0.1.0 h1:dx0nS0kILVivGhfWuB6dUpMa/LAwElHPw1yOGYopoYs= github.com/ipfs/go-ipld-cbor v0.1.0/go.mod h1:U2aYlmVrJr2wsUBU67K4KgepApSZddGRDWBYR0H4sCk= github.com/ipfs/go-ipld-format v0.6.0 h1:VEJlA2kQ3LqFSIm5Vu6eIlSxD/Ze90xtc4Meten1F5U= @@ -324,14 +321,12 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= -github.com/ipfs/go-unixfsnode v1.6.0 h1:JOSA02yaLylRNi2rlB4ldPr5VcZhcnaIVj5zNLcOjDo= -github.com/ipfs/go-unixfsnode v1.6.0/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= +github.com/ipfs/go-unixfsnode v1.8.0 h1:yCkakzuE365glu+YkgzZt6p38CSVEBPgngL9ZkfnyQU= +github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs= github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw= github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI= github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA= -github.com/ipld/go-car/v2 v2.9.0 h1:mkMSfh9NpnfdFe30xBFTQiKZ6+LY+mwOPrq6r56xsPo= -github.com/ipld/go-car/v2 v2.9.0/go.mod h1:UeIST4b5Je6LEx8GjFysgeCYwxAHKtAcsWxmF6PupNQ= github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= @@ -377,7 +372,6 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= @@ -629,7 +623,6 @@ github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQ github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/vmihailenco/go-tinylfu v0.2.2 h1:H1eiG6HM36iniK6+21n9LLpzx1G9R3DJa2UjUjbynsI= github.com/vmihailenco/go-tinylfu v0.2.2/go.mod h1:CutYi2Q9puTxfcolkliPq4npPuofg9N9t8JVrjzwa3Q= -github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= @@ -641,8 +634,6 @@ github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSD github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 h1:5HZfQkwe0mIfyDmc1Em5GqlNRzcdtlv4HTNmdpt7XH0= github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11/go.mod h1:Wlo/SzPmxVp6vXpGt/zaXhHH0fn4IxgqZc82aKg6bpQ= -github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 h1:yVYDLoN2gmB3OdBXFW8e1UwgVbmCvNlnAKhvHPaNARI= -github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/cbor-gen v0.0.0-20230923211252-36a87e1ba72f h1:SBuSxXJL0/ZJMtTxbXZgHZkThl9dNrzyaNhlyaqscRo= github.com/whyrusleeping/cbor-gen v0.0.0-20230923211252-36a87e1ba72f/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= @@ -992,8 +983,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=