Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch All Replies v2 - Service Edition #44

Merged
merged 22 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
23b165e
Initial draft of fetching all replies on context load
sneakers-the-rat Apr 16, 2024
e53938f
committing all ugly with a bunch of logger calls in the middle but we…
May 8, 2024
5bb473d
working (i think?) recursive fetch
sneakers-the-rat Sep 19, 2024
d7c0ce0
don't do it for every create, only do recursive reply expansion when …
sneakers-the-rat Sep 19, 2024
bf86d13
correct number of args to replies worker, recursive fetching is working
sneakers-the-rat Sep 19, 2024
2b16ac9
accept review comments https://github.com/NeuromatchAcademy/mastodon/…
sneakers-the-rat Sep 19, 2024
714e490
stashing some partial work on streaming
sneakers-the-rat Sep 22, 2024
40fbfea
Revert "stashing some partial work on streaming"
sneakers-the-rat Sep 22, 2024
1632d2f
Remove recursion, separate out into separate workers/services, add li…
sneakers-the-rat Sep 30, 2024
4aaf91e
rm redundant request to fetch replies worker in controller
sneakers-the-rat Sep 30, 2024
a65e29f
rm zombie code in fetch_replies_service
sneakers-the-rat Sep 30, 2024
9d404e2
rm spurious imports and reformatting
sneakers-the-rat Sep 30, 2024
37e2c0b
rm more spurious formatting
sneakers-the-rat Sep 30, 2024
fc447f5
Working version of fetch all replies service with global maximum on f…
sneakers-the-rat Oct 13, 2024
2eeb6c4
Fix limit in fetch_replies_service to not always limit by 5 (which al…
sneakers-the-rat Oct 13, 2024
6d3ccb9
the most basic test you could imagine
sneakers-the-rat Oct 13, 2024
f7b309f
tests for the fetch all reply worker
sneakers-the-rat Oct 14, 2024
3271217
add configurability via .env file
sneakers-the-rat Oct 15, 2024
cc1a1be
remove account on_behalf_of entirely since it doesn't do anything in …
sneakers-the-rat Oct 15, 2024
f6c0079
make limits configurable too while we're at it
sneakers-the-rat Oct 15, 2024
ab35055
remove redundant params - forgot i subclassed
sneakers-the-rat Oct 15, 2024
8048524
add some more debugging messages, catch errors in fetching child uris…
sneakers-the-rat Oct 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.production.sample
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,24 @@ MAX_POLL_OPTION_CHARS=100
# New registrations will automatically follow these accounts (separated by commas)
AUTOFOLLOW=

# -- Fetch all replies settings --
# When a user expands a post (DetailedStatus view), fetch all of its replies
# (default: true if unset, set explicitly to ``false`` to disable)
FETCH_REPLIES_ENABLED=true

# Period to wait between fetching replies (in minutes)
FETCH_REPLIES_DEBOUNCE=15

# Period to wait after a post is first created before fetching its replies (in minutes)
FETCH_REPLIES_CREATED_RECENTLY=5

# Max number of replies to fetch - total, recursively through a whole reply tree
FETCH_REPLIES_MAX_GLOBAL=1000

# Max number of replies to fetch - for a single post
FETCH_REPLIES_MAX_SINGLE=500


# IP and session retention
# -----------------------
# Make sure to modify the scheduling of ip_cleanup_scheduler in config/sidekiq.yml
Expand Down
9 changes: 9 additions & 0 deletions app/controllers/api/v1/statuses_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ def context
statuses = [@status] + @context.ancestors + @context.descendants

render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)

if !current_account.nil? && @status.should_fetch_replies?
ActivityPub::FetchAllRepliesWorker.perform_async(
@status.id,
{
allow_synchronous_requests: true,
}
)
end
end

def create
Expand Down
29 changes: 29 additions & 0 deletions app/models/concerns/status/fetch_replies_concern.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Status::FetchRepliesConcern
extend ActiveSupport::Concern

# enable/disable fetching all replies
FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true

sneakers-the-rat marked this conversation as resolved.
Show resolved Hide resolved
# debounce fetching all replies to minimize DoS
FETCH_REPLIES_DEBOUNCE = (ENV['FETCH_REPLIES_DEBOUNCE'] || 15).to_i.minutes
CREATED_RECENTLY_DEBOUNCE = (ENV['FETCH_REPLIES_CREATED_RECENTLY'] || 5).to_i.minutes

included do
scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) }
scope :not_created_recently, -> { where(created_at: ..CREATED_RECENTLY_DEBOUNCE.ago) }
scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) }
scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) }

scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) }
scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) }
end

def should_fetch_replies?
# we aren't brand new, and we haven't fetched replies since the debounce window
FETCH_REPLIES_ENABLED && !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && (
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago
)
end
end
2 changes: 2 additions & 0 deletions app/models/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
# edited_at :datetime
# trendable :boolean
# ordered_media_attachment_ids :bigint(8) is an Array
# fetched_replies_at :datetime
#

class Status < ApplicationRecord
include Cacheable
include Discard::Model
include Paginable
include RateLimitable
include Status::FetchRepliesConcern
include Status::SafeReblogInsert
include Status::SearchConcern
include Status::SnapshotConcern
Expand Down
49 changes: 49 additions & 0 deletions app/services/activitypub/fetch_all_replies_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService
include JsonLdHelper

# Limit of replies to fetch per status
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i

def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil)
@allow_synchronous_requests = allow_synchronous_requests
@filter_by_host = false
@collection_or_uri = collection_or_uri

@items = collection_items(collection_or_uri)
@items = filtered_replies
return if @items.nil?

FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }

@items
end

private

def filtered_replies
return if @items.nil?

# Find all statuses that we *shouldn't* update the replies for, and use that as a filter.
# We don't assume that we have the statuses before they're created,
# hence the negative filter -
# "keep all these uris except the ones we already have"
# instead of
# "keep all these uris that match some conditions on existing Status objects"
#
# Typically we assume the number of replies we *shouldn't* fetch is smaller than the
# replies we *should* fetch, so we also minimize the number of uris we should load here.
uris = @items.map { |item| value_or_id(item) }
dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri)

# touch all statuses that already exist and that we're about to update
Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at)

# Reject all statuses that we already have in the db
uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES)

Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" }
uris
end
end
41 changes: 30 additions & 11 deletions app/services/activitypub/fetch_replies_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
class ActivityPub::FetchRepliesService < BaseService
include JsonLdHelper

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil)
# Limit of fetched replies
MAX_REPLIES = 5

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true)
@account = parent_status.account
@allow_synchronous_requests = allow_synchronous_requests
@filter_by_host = filter_by_host

@items = collection_items(collection_or_uri)
return if @items.nil?
Expand All @@ -24,18 +28,29 @@ def collection_items(collection_or_uri)
collection = fetch_collection(collection['first']) if collection['first'].present?
return unless collection.is_a?(Hash)

case collection['type']
when 'Collection', 'CollectionPage'
as_array(collection['items'])
when 'OrderedCollection', 'OrderedCollectionPage'
as_array(collection['orderedItems'])
all_items = []
while collection.is_a?(Hash)
items = case collection['type']
when 'Collection', 'CollectionPage'
collection['items']
when 'OrderedCollection', 'OrderedCollectionPage'
collection['orderedItems']
end

all_items.concat(as_array(items))

break if all_items.size > MAX_REPLIES

collection = collection['next'].present? ? fetch_collection(collection['next']) : nil
end

all_items
end

def fetch_collection(collection_or_uri)
return collection_or_uri if collection_or_uri.is_a?(Hash)
return unless @allow_synchronous_requests
return if non_matching_uri_hosts?(@account.uri, collection_or_uri)
return if @filter_by_host && non_matching_uri_hosts?(@account.uri, collection_or_uri)

# NOTE: For backward compatibility reasons, Mastodon signs outgoing
# queries incorrectly by default.
Expand All @@ -54,10 +69,14 @@ def fetch_collection(collection_or_uri)
end

def filtered_replies
# Only fetch replies to the same server as the original status to avoid
# amplification attacks.
if @filter_by_host
# Only fetch replies to the same server as the original status to avoid
# amplification attacks.

# Also limit to 5 fetched replies to limit potential for DoS.
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5)
# Also limit to 5 fetched replies to limit potential for DoS.
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES)
else
@items.map { |item| value_or_id(item) }.take(MAX_REPLIES)
end
end
end
74 changes: 74 additions & 0 deletions app/workers/activitypub/fetch_all_replies_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# frozen_string_literal: true

# Fetch all replies to a status, querying recursively through
# ActivityPub replies collections, fetching any statuses that
# we either don't already have or we haven't checked for new replies
# in the Status::FETCH_REPLIES_DEBOUNCE interval
class ActivityPub::FetchAllRepliesWorker
include Sidekiq::Worker
include ExponentialBackoff
include JsonLdHelper

sidekiq_options queue: 'pull', retry: 3

# Global max replies to fetch per request (all replies, recursively)
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i

def perform(parent_status_id, options = {})
@parent_status = Status.find(parent_status_id)
Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" }

uris_to_fetch = get_replies(@parent_status.uri, options)
return if uris_to_fetch.nil?

@parent_status.touch(:fetched_replies_at)

fetched_uris = uris_to_fetch.clone.to_set

until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES
next_reply = uris_to_fetch.pop
next if next_reply.nil?

new_reply_uris = get_replies(next_reply, options)
next if new_reply_uris.nil?

new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) }

uris_to_fetch.concat(new_reply_uris)
fetched_uris = fetched_uris.merge(new_reply_uris)
end

Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" }
fetched_uris
end

private

def get_replies(status_uri, options = {})
replies_collection_or_uri = get_replies_uri(status_uri)
return if replies_collection_or_uri.nil?

ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, **options.deep_symbolize_keys)
end

def get_replies_uri(parent_status_uri)
begin
json_status = fetch_resource(parent_status_uri, true)
if json_status.nil?
Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: error getting replies URI for #{parent_status_uri}, returned nil" }
nil
elsif !json_status.key?('replies')
Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: no replies collection found in ActivityPub object: #{json_status}" }
nil
else
json_status['replies']
end
rescue => e
Rails.logger.warn { "FetchAllRepliesWorker - #{@parent_status.uri}: caught exception fetching replies URI: #{e}" }
# Raise if we can't get the collection for top-level status to trigger retry
raise e if parent_status_uri == @parent_status.uri

nil
end
end
end
7 changes: 7 additions & 0 deletions db/migrate/20240918233930_add_fetched_replies_at_to_status.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1]
def change
add_column :statuses, :fetched_replies_at, :datetime, null: true
end
end
25 changes: 13 additions & 12 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,12 @@
end

create_table "ip_blocks", force: :cascade do |t|
t.datetime "created_at", precision: nil, null: false
t.datetime "updated_at", precision: nil, null: false
t.datetime "expires_at", precision: nil
t.inet "ip", default: "0.0.0.0", null: false
t.integer "severity", default: 0, null: false
t.datetime "expires_at", precision: nil
t.text "comment", default: "", null: false
t.datetime "created_at", precision: nil, null: false
t.datetime "updated_at", precision: nil, null: false
t.index ["ip"], name: "index_ip_blocks_on_ip", unique: true
end

Expand Down Expand Up @@ -1052,6 +1052,7 @@
t.datetime "edited_at", precision: nil
t.boolean "trendable"
t.bigint "ordered_media_attachment_ids", array: true
t.datetime "fetched_replies_at"
t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)"
t.index ["account_id"], name: "index_statuses_on_account_id"
t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)"
Expand Down Expand Up @@ -1380,9 +1381,9 @@
add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true

create_view "user_ips", sql_definition: <<-SQL
SELECT user_id,
ip,
max(used_at) AS used_at
SELECT t0.user_id,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyone know why rails always wants to do this for me when i run a migration? i didn't do anything special to the config or anything, and it seems to not do anything, but it keeps happening?

t0.ip,
max(t0.used_at) AS used_at
FROM ( SELECT users.id AS user_id,
users.sign_up_ip AS ip,
users.created_at AS used_at
Expand All @@ -1399,7 +1400,7 @@
login_activities.created_at
FROM login_activities
WHERE (login_activities.success = true)) t0
GROUP BY user_id, ip;
GROUP BY t0.user_id, t0.ip;
SQL
create_view "account_summaries", materialized: true, sql_definition: <<-SQL
SELECT accounts.id AS account_id,
Expand All @@ -1420,9 +1421,9 @@
add_index "account_summaries", ["account_id"], name: "index_account_summaries_on_account_id", unique: true

create_view "global_follow_recommendations", materialized: true, sql_definition: <<-SQL
SELECT account_id,
sum(rank) AS rank,
array_agg(reason) AS reason
SELECT t0.account_id,
sum(t0.rank) AS rank,
array_agg(t0.reason) AS reason
FROM ( SELECT account_summaries.account_id,
((count(follows.id))::numeric / (1.0 + (count(follows.id))::numeric)) AS rank,
'most_followed'::text AS reason
Expand All @@ -1446,8 +1447,8 @@
WHERE (follow_recommendation_suppressions.account_id = statuses.account_id)))))
GROUP BY account_summaries.account_id
HAVING (sum((status_stats.reblogs_count + status_stats.favourites_count)) >= (5)::numeric)) t0
GROUP BY account_id
ORDER BY (sum(rank)) DESC;
GROUP BY t0.account_id
ORDER BY (sum(t0.rank)) DESC;
SQL
add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true

Expand Down
Loading
Loading