diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 24684f57190..4317c90dab4 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -152,7 +152,7 @@ def __init__(self, hs: "HomeServer"): self._clock = hs.get_clock() self._store = hs.get_datastores().main self._state_store = hs.get_datastores().state - self._state_epoch_store = hs.get_datastores().state_epochs + self._state_deletion_store = hs.get_datastores().state_deletion self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state @@ -583,7 +583,7 @@ async def process_remote_join( state_maps_to_resolve, event_map=None, state_res_store=StateResolutionStore( - self._store, self._state_epoch_store + self._store, self._state_deletion_store ), ) ) @@ -1184,7 +1184,7 @@ async def _compute_event_context_with_maybe_missing_prevs( state_maps, event_map={event_id: event}, state_res_store=StateResolutionStore( - self._store, self._state_epoch_store + self._store, self._state_deletion_store ), ) @@ -1881,7 +1881,7 @@ async def _check_event_auth( [local_state_id_map, claimed_auth_events_id_map], event_map=None, state_res_store=StateResolutionStore( - self._store, self._state_epoch_store + self._store, self._state_deletion_store ), ) ) @@ -2023,7 +2023,7 @@ async def _check_for_soft_fail( state_sets, event_map=None, state_res_store=StateResolutionStore( - self._store, self._state_epoch_store + self._store, self._state_deletion_store ), ) ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index a92cddc4617..fd509db4861 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -65,7 +65,7 @@ from synapse.server import HomeServer from synapse.storage.controllers import StateStorageController from synapse.storage.databases.main import DataStore - from synapse.storage.databases.state.epochs import StateEpochDataStore + from synapse.storage.databases.state.deletion import StateDeletionDataStore logger = logging.getLogger(__name__) metrics_logger = logging.getLogger("synapse.state.metrics") @@ -197,7 +197,7 @@ def __init__(self, hs: "HomeServer"): self._events_shard_config = hs.config.worker.events_shard_config self._instance_name = hs.get_instance_name() self._state_store = hs.get_datastores().state - self._state_epoch_store = hs.get_datastores().state_epochs + self._state_epoch_store = hs.get_datastores().state_deletion self._update_current_state_client = ( ReplicationUpdateCurrentStateRestServlet.make_client(hs) @@ -692,7 +692,7 @@ async def resolve_state_groups( if cache.prev_group is not None: state_groups_to_check.add(cache.prev_group) - missing_state_groups = await state_res_store.state_epoch_store.check_state_groups_and_bump_deletion( + missing_state_groups = await state_res_store.state_deletion_store.check_state_groups_and_bump_deletion( state_groups_to_check ) @@ -711,7 +711,7 @@ async def resolve_state_groups( # We double check that none of the state groups have been deleted. # They shouldn't be as all these state groups should be referenced. - missing_state_groups = await state_res_store.state_epoch_store.check_state_groups_and_bump_deletion( + missing_state_groups = await state_res_store.state_deletion_store.check_state_groups_and_bump_deletion( group_names ) if missing_state_groups: @@ -933,7 +933,7 @@ class StateResolutionStore: """ main_store: "DataStore" - state_epoch_store: "StateEpochDataStore" + state_deletion_store: "StateDeletionDataStore" def get_events( self, event_ids: StrCollection, allow_rejected: bool = False diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 54c638f2adb..7963905479b 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -332,7 +332,7 @@ def __init__( # store for now. self.main_store = stores.main self.state_store = stores.state - self._state_epoch_store = stores.state_epochs + self._state_deletion_store = stores.state_deletion assert stores.persist_events self.persist_events_store = stores.persist_events @@ -551,7 +551,7 @@ async def _calculate_current_state(self, room_id: str) -> StateMap[str]: state_maps_by_state_group, event_map=None, state_res_store=StateResolutionStore( - self.main_store, self._state_epoch_store + self.main_store, self._state_deletion_store ), ) @@ -640,7 +640,7 @@ async def _persist_event_batch( # Stop the state groups from being deleted while we're persisting # them. - async with self._state_epoch_store.persisting_state_group_references( + async with self._state_deletion_store.persisting_state_group_references( events_and_contexts ): await self.persist_events_store._persist_events_and_state_updates( @@ -974,7 +974,7 @@ async def _get_new_state_after_events( state_groups, events_map, state_res_store=StateResolutionStore( - self.main_store, self._state_epoch_store + self.main_store, self._state_deletion_store ), ) diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 98940f56c87..81886ff765b 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -26,7 +26,7 @@ from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.events import PersistEventsStore from synapse.storage.databases.state import StateGroupDataStore -from synapse.storage.databases.state.epochs import StateEpochDataStore +from synapse.storage.databases.state.deletion import StateDeletionDataStore from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database @@ -50,14 +50,14 @@ class Databases(Generic[DataStoreT]): main state persist_events - state_epochs + state_deletion """ databases: List[DatabasePool] main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class` state: StateGroupDataStore persist_events: Optional[PersistEventsStore] - state_epochs: StateEpochDataStore + state_deletion: StateDeletionDataStore def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): # Note we pass in the main store class here as workers use a different main @@ -66,7 +66,7 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): self.databases = [] main: Optional[DataStoreT] = None state: Optional[StateGroupDataStore] = None - state_epochs: Optional[StateEpochDataStore] = None + state_deletion: Optional[StateDeletionDataStore] = None persist_events: Optional[PersistEventsStore] = None for database_config in hs.config.database.databases: @@ -118,8 +118,8 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): if state: raise Exception("'state' data store already configured") - state_epochs = StateEpochDataStore(database, db_conn, hs) - state = StateGroupDataStore(database, db_conn, hs, state_epochs) + state_deletion = StateDeletionDataStore(database, db_conn, hs) + state = StateGroupDataStore(database, db_conn, hs, state_deletion) db_conn.commit() @@ -140,7 +140,7 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): if not main: raise Exception("No 'main' database configured") - if not state or not state_epochs: + if not state or not state_deletion: raise Exception("No 'state' database configured") # We use local variables here to ensure that the databases do not have @@ -148,4 +148,4 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): self.main = main # type: ignore[assignment] self.state = state self.persist_events = persist_events - self.state_epochs = state_epochs + self.state_deletion = state_deletion diff --git a/synapse/storage/databases/state/epochs.py b/synapse/storage/databases/state/deletion.py similarity index 75% rename from synapse/storage/databases/state/epochs.py rename to synapse/storage/databases/state/deletion.py index f813f2feea2..9e1b278842a 100644 --- a/synapse/storage/databases/state/epochs.py +++ b/synapse/storage/databases/state/deletion.py @@ -25,7 +25,6 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -39,8 +38,8 @@ from synapse.server import HomeServer -class StateEpochDataStore: - """Manages state epochs and checks for state group deletion. +class StateDeletionDataStore: + """Manages deletion of state groups in a safe manner. Deleting state groups is challenging as before we actually delete them we need to ensure that there are no in-flight events that refer to the state @@ -58,20 +57,19 @@ class StateEpochDataStore: database integrity. However, we want to avoid throwing exceptions so deep in the process of - persisting events. So we use a concept of `state_epochs`, where we mark - state groups as pending/proposed for deletion and wait for a certain number - epoch increments before performing the deletion. When we come to handle new - events that reference state groups, we check if they are pending deletion - and bump the epoch when they'll be deleted in (to give a chance for the - event to be persisted, or not). + persisting events. So instead of deleting state groups immediately, we mark + them as pending/proposed for deletion and wait for a certain amount of time + before performing the deletion. When we come to handle new events that + reference state groups, we check if they are pending deletion and bump the + time for when they'll be deleted (to give a chance for the event to be + persisted, or not). """ - # How frequently, roughly, to increment epochs. - TIME_BETWEEN_EPOCH_INCREMENTS_MS = 5 * 60 * 1000 - - # The number of epoch increases that must have happened between marking a - # state group as pending and actually deleting it. - NUMBER_EPOCHS_BEFORE_DELETION = 3 + # How long to wait before we delete state groups. This should be long enough + # for any in-flight events to be persisted. If events take longer to persist + # and any of the state groups they reference have been deleted, then the + # event will fail to persist (as well as any event in the same batch). + DELAY_BEFORE_DELETION_MS = 10 * 60 * 1000 def __init__( self, @@ -86,34 +84,6 @@ def __init__( # TODO: Clear from `state_groups_persisting` any holdovers from previous # running instance. - if hs.config.worker.run_background_tasks: - # Add a background loop to periodically check if we should bump - # state epoch. - self._clock.looping_call_now( - self._advance_state_epoch, self.TIME_BETWEEN_EPOCH_INCREMENTS_MS / 5 - ) - - @wrap_as_background_process("_advance_state_epoch") - async def _advance_state_epoch(self) -> None: - """Advances the state epoch, checking that we haven't advanced it too - recently. - """ - - now = self._clock.time_msec() - update_if_before_ts = now - self.TIME_BETWEEN_EPOCH_INCREMENTS_MS - - def advance_state_epoch_txn(txn: LoggingTransaction) -> None: - sql = """ - UPDATE state_epoch - SET state_epoch = state_epoch + 1, updated_ts = ? - WHERE updated_ts <= ? - """ - txn.execute(sql, (now, update_if_before_ts)) - - await self.db_pool.runInteraction( - "_advance_state_epoch", advance_state_epoch_txn, db_autocommit=True - ) - async def check_state_groups_and_bump_deletion( self, state_groups: AbstractSet[int] ) -> Collection[int]: @@ -143,9 +113,10 @@ def _check_state_groups_and_bump_deletion_txn( ) sql = f""" UPDATE state_groups_pending_deletion - SET state_epoch = (SELECT state_epoch FROM state_epoch) + SET insertion_ts = ? WHERE {clause} """ + args.insert(0, self._clock.time_msec()) txn.execute(sql, args) @@ -219,8 +190,7 @@ async def persisting_state_group_references( def _mark_state_groups_as_used_txn( self, txn: LoggingTransaction, state_groups: Set[int] ) -> None: - """Marks the given state groups as used. Also checks that the given - state epoch is not too old.""" + """Marks the given state groups as used.""" existing_state_groups = self._get_existing_groups_with_lock(txn, state_groups) missing_state_groups = state_groups - existing_state_groups @@ -264,21 +234,14 @@ def get_state_groups_that_can_be_purged_txn( """ txn.execute(sql, args) - current_state_epoch = self.db_pool.simple_select_one_onecol_txn( - txn, - table="state_epoch", - retcol="state_epoch", - keyvalues={}, - ) - # Check the deletion status in the DB of the given state groups clause, args = make_in_list_sql_clause( self.db_pool.engine, column="state_group", iterable=state_groups ) sql = f""" - SELECT state_group, state_epoch FROM ( - SELECT state_group, state_epoch FROM state_groups_pending_deletion + SELECT state_group, insertion_ts FROM ( + SELECT state_group, insertion_ts FROM state_groups_pending_deletion UNION SELECT state_group, null FROM state_groups_persisting ) AS s @@ -288,15 +251,16 @@ def get_state_groups_that_can_be_purged_txn( txn.execute(sql, args) can_delete = set() - for state_group, state_epoch in txn: - if state_epoch is None: - # A null state epoch means that we are currently persisting + now = self._clock.time_msec() + for state_group, insertion_ts in txn: + if insertion_ts is None: + # A null insertion_ts means that we are currently persisting # events that reference the state group, so we don't delete # them. continue - if current_state_epoch - state_epoch < self.NUMBER_EPOCHS_BEFORE_DELETION: - # Not enough state epochs have occurred to allow us to delete. + if now - insertion_ts < self.DELAY_BEFORE_DELETION_MS: + # Not enough time has elapsed to allow us to delete. continue can_delete.add(state_group) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index b2ef3703c33..7e986e06018 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -58,7 +58,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.storage.databases.state.epochs import StateEpochDataStore + from synapse.storage.databases.state.deletion import StateDeletionDataStore logger = logging.getLogger(__name__) @@ -87,10 +87,10 @@ def __init__( database: DatabasePool, db_conn: LoggingDatabaseConnection, hs: "HomeServer", - epoch_store: "StateEpochDataStore", + state_deletion_store: "StateDeletionDataStore", ): super().__init__(database, db_conn, hs) - self._epoch_store = epoch_store + self._state_deletion_store = state_deletion_store # Originally the state store used a single DictionaryCache to cache the # event IDs for the state types in a given state group to avoid hammering @@ -475,9 +475,11 @@ def insert_deltas_group_txn( """ # We need to check that the prev group isn't about to be deleted - is_missing = self._epoch_store._check_state_groups_and_bump_deletion_txn( - txn, - {prev_group}, + is_missing = ( + self._state_deletion_store._check_state_groups_and_bump_deletion_txn( + txn, + {prev_group}, + ) ) if is_missing: raise Exception( @@ -609,9 +611,11 @@ def insert_delta_group_txn( """ # We need to check that the prev group isn't about to be deleted - is_missing = self._epoch_store._check_state_groups_and_bump_deletion_txn( - txn, - {prev_group}, + is_missing = ( + self._state_deletion_store._check_state_groups_and_bump_deletion_txn( + txn, + {prev_group}, + ) ) if is_missing: raise Exception( diff --git a/synapse/storage/schema/state/delta/89/01_state_groups_epochs.sql b/synapse/storage/schema/state/delta/89/01_state_groups_deletion.sql similarity index 56% rename from synapse/storage/schema/state/delta/89/01_state_groups_epochs.sql rename to synapse/storage/schema/state/delta/89/01_state_groups_deletion.sql index 3ffc6b1a5f1..02e97ac34c5 100644 --- a/synapse/storage/schema/state/delta/89/01_state_groups_epochs.sql +++ b/synapse/storage/schema/state/delta/89/01_state_groups_deletion.sql @@ -11,30 +11,17 @@ -- See the GNU Affero General Public License for more details: -- . --- See the `StateEpochDataStore` for details of these tables. - --- Holds the current state epoch -CREATE TABLE IF NOT EXISTS state_epoch ( - Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. - state_epoch BIGINT NOT NULL, - updated_ts BIGINT NOT NULL, - CHECK (Lock='X') -); - --- Insert a row so that we always have one row in the table. This will get --- updated when Synapse starts. -INSERT INTO state_epoch (state_epoch, updated_ts) VALUES (0, 0); - +-- See the `StateDeletionDataStore` for details of these tables. -- We add state groups to this table when we want to later delete them. The --- `state_epoch` column indicates when the state group was inserted. +-- `insertion_ts` column indicates when the state group was proposed for +-- deletion (rather than when it should be deleted). CREATE TABLE IF NOT EXISTS state_groups_pending_deletion ( - state_group BIGINT NOT NULL, - state_epoch BIGINT NOT NULL, - PRIMARY KEY (state_group, state_epoch) + state_group BIGINT NOT NULL PRIMARY KEY, + insertion_ts BIGINT NOT NULL ); -CREATE INDEX state_groups_pending_deletion_epoch ON state_groups_pending_deletion(state_epoch); +CREATE INDEX state_groups_pending_deletion_insertion_ts ON state_groups_pending_deletion(insertion_ts); -- Holds the state groups the worker is currently persisting. diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 213620caae3..80b89d5adf9 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -807,7 +807,7 @@ def test_process_pulled_event_with_rejected_missing_state(self) -> None: OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" main_store = self.hs.get_datastores().main - epoch_store = self.hs.get_datastores().state_epochs + state_deletion_store = self.hs.get_datastores().state_deletion # Create the room. kermit_user_id = self.register_user("kermit", "test") @@ -959,7 +959,9 @@ def test_process_pulled_event_with_rejected_missing_state(self) -> None: bert_member_event.event_id: bert_member_event, rejected_kick_event.event_id: rejected_kick_event, }, - state_res_store=StateResolutionStore(main_store, epoch_store), + state_res_store=StateResolutionStore( + main_store, state_deletion_store + ), ) ), [bert_member_event.event_id, rejected_kick_event.event_id], @@ -1004,7 +1006,9 @@ def test_process_pulled_event_with_rejected_missing_state(self) -> None: rejected_power_levels_event.event_id, ], event_map={}, - state_res_store=StateResolutionStore(main_store, epoch_store), + state_res_store=StateResolutionStore( + main_store, state_deletion_store + ), full_conflicted_set=set(), ) ),