Skip to content

Commit

Permalink
Use time not epochs
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 27, 2025
1 parent 53a7438 commit 989eaef
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 113 deletions.
10 changes: 5 additions & 5 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._state_store = hs.get_datastores().state
self._state_epoch_store = hs.get_datastores().state_epochs
self._state_deletion_store = hs.get_datastores().state_deletion
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state

Expand Down Expand Up @@ -583,7 +583,7 @@ async def process_remote_join(
state_maps_to_resolve,
event_map=None,
state_res_store=StateResolutionStore(
self._store, self._state_epoch_store
self._store, self._state_deletion_store
),
)
)
Expand Down Expand Up @@ -1184,7 +1184,7 @@ async def _compute_event_context_with_maybe_missing_prevs(
state_maps,
event_map={event_id: event},
state_res_store=StateResolutionStore(
self._store, self._state_epoch_store
self._store, self._state_deletion_store
),
)

Expand Down Expand Up @@ -1881,7 +1881,7 @@ async def _check_event_auth(
[local_state_id_map, claimed_auth_events_id_map],
event_map=None,
state_res_store=StateResolutionStore(
self._store, self._state_epoch_store
self._store, self._state_deletion_store
),
)
)
Expand Down Expand Up @@ -2023,7 +2023,7 @@ async def _check_for_soft_fail(
state_sets,
event_map=None,
state_res_store=StateResolutionStore(
self._store, self._state_epoch_store
self._store, self._state_deletion_store
),
)
)
Expand Down
10 changes: 5 additions & 5 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from synapse.server import HomeServer
from synapse.storage.controllers import StateStorageController
from synapse.storage.databases.main import DataStore
from synapse.storage.databases.state.epochs import StateEpochDataStore
from synapse.storage.databases.state.deletion import StateDeletionDataStore

logger = logging.getLogger(__name__)
metrics_logger = logging.getLogger("synapse.state.metrics")
Expand Down Expand Up @@ -197,7 +197,7 @@ def __init__(self, hs: "HomeServer"):
self._events_shard_config = hs.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._state_store = hs.get_datastores().state
self._state_epoch_store = hs.get_datastores().state_epochs
self._state_epoch_store = hs.get_datastores().state_deletion

self._update_current_state_client = (
ReplicationUpdateCurrentStateRestServlet.make_client(hs)
Expand Down Expand Up @@ -692,7 +692,7 @@ async def resolve_state_groups(
if cache.prev_group is not None:
state_groups_to_check.add(cache.prev_group)

missing_state_groups = await state_res_store.state_epoch_store.check_state_groups_and_bump_deletion(
missing_state_groups = await state_res_store.state_deletion_store.check_state_groups_and_bump_deletion(
state_groups_to_check
)

Expand All @@ -711,7 +711,7 @@ async def resolve_state_groups(

# We double check that none of the state groups have been deleted.
# They shouldn't be as all these state groups should be referenced.
missing_state_groups = await state_res_store.state_epoch_store.check_state_groups_and_bump_deletion(
missing_state_groups = await state_res_store.state_deletion_store.check_state_groups_and_bump_deletion(
group_names
)
if missing_state_groups:
Expand Down Expand Up @@ -933,7 +933,7 @@ class StateResolutionStore:
"""

main_store: "DataStore"
state_epoch_store: "StateEpochDataStore"
state_deletion_store: "StateDeletionDataStore"

def get_events(
self, event_ids: StrCollection, allow_rejected: bool = False
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/controllers/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def __init__(
# store for now.
self.main_store = stores.main
self.state_store = stores.state
self._state_epoch_store = stores.state_epochs
self._state_deletion_store = stores.state_deletion

assert stores.persist_events
self.persist_events_store = stores.persist_events
Expand Down Expand Up @@ -551,7 +551,7 @@ async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
state_maps_by_state_group,
event_map=None,
state_res_store=StateResolutionStore(
self.main_store, self._state_epoch_store
self.main_store, self._state_deletion_store
),
)

Expand Down Expand Up @@ -640,7 +640,7 @@ async def _persist_event_batch(

# Stop the state groups from being deleted while we're persisting
# them.
async with self._state_epoch_store.persisting_state_group_references(
async with self._state_deletion_store.persisting_state_group_references(
events_and_contexts
):
await self.persist_events_store._persist_events_and_state_updates(
Expand Down Expand Up @@ -974,7 +974,7 @@ async def _get_new_state_after_events(
state_groups,
events_map,
state_res_store=StateResolutionStore(
self.main_store, self._state_epoch_store
self.main_store, self._state_deletion_store
),
)

Expand Down
16 changes: 8 additions & 8 deletions synapse/storage/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore
from synapse.storage.databases.state import StateGroupDataStore
from synapse.storage.databases.state.epochs import StateEpochDataStore
from synapse.storage.databases.state.deletion import StateDeletionDataStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database

Expand All @@ -50,14 +50,14 @@ class Databases(Generic[DataStoreT]):
main
state
persist_events
state_epochs
state_deletion
"""

databases: List[DatabasePool]
main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class`
state: StateGroupDataStore
persist_events: Optional[PersistEventsStore]
state_epochs: StateEpochDataStore
state_deletion: StateDeletionDataStore

def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):
# Note we pass in the main store class here as workers use a different main
Expand All @@ -66,7 +66,7 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):
self.databases = []
main: Optional[DataStoreT] = None
state: Optional[StateGroupDataStore] = None
state_epochs: Optional[StateEpochDataStore] = None
state_deletion: Optional[StateDeletionDataStore] = None
persist_events: Optional[PersistEventsStore] = None

for database_config in hs.config.database.databases:
Expand Down Expand Up @@ -118,8 +118,8 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):
if state:
raise Exception("'state' data store already configured")

state_epochs = StateEpochDataStore(database, db_conn, hs)
state = StateGroupDataStore(database, db_conn, hs, state_epochs)
state_deletion = StateDeletionDataStore(database, db_conn, hs)
state = StateGroupDataStore(database, db_conn, hs, state_deletion)

db_conn.commit()

Expand All @@ -140,12 +140,12 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"):
if not main:
raise Exception("No 'main' database configured")

if not state or not state_epochs:
if not state or not state_deletion:
raise Exception("No 'state' database configured")

# We use local variables here to ensure that the databases do not have
# optional types.
self.main = main # type: ignore[assignment]
self.state = state
self.persist_events = persist_events
self.state_epochs = state_epochs
self.state_deletion = state_deletion
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
Expand All @@ -39,8 +38,8 @@
from synapse.server import HomeServer


class StateEpochDataStore:
"""Manages state epochs and checks for state group deletion.
class StateDeletionDataStore:
"""Manages deletion of state groups in a safe manner.
Deleting state groups is challenging as before we actually delete them we
need to ensure that there are no in-flight events that refer to the state
Expand All @@ -58,20 +57,19 @@ class StateEpochDataStore:
database integrity.
However, we want to avoid throwing exceptions so deep in the process of
persisting events. So we use a concept of `state_epochs`, where we mark
state groups as pending/proposed for deletion and wait for a certain number
epoch increments before performing the deletion. When we come to handle new
events that reference state groups, we check if they are pending deletion
and bump the epoch when they'll be deleted in (to give a chance for the
event to be persisted, or not).
persisting events. So instead of deleting state groups immediately, we mark
them as pending/proposed for deletion and wait for a certain amount of time
before performing the deletion. When we come to handle new events that
reference state groups, we check if they are pending deletion and bump the
time for when they'll be deleted (to give a chance for the event to be
persisted, or not).
"""

# How frequently, roughly, to increment epochs.
TIME_BETWEEN_EPOCH_INCREMENTS_MS = 5 * 60 * 1000

# The number of epoch increases that must have happened between marking a
# state group as pending and actually deleting it.
NUMBER_EPOCHS_BEFORE_DELETION = 3
# How long to wait before we delete state groups. This should be long enough
# for any in-flight events to be persisted. If events take longer to persist
# and any of the state groups they reference have been deleted, then the
# event will fail to persist (as well as any event in the same batch).
DELAY_BEFORE_DELETION_MS = 10 * 60 * 1000

def __init__(
self,
Expand All @@ -86,34 +84,6 @@ def __init__(
# TODO: Clear from `state_groups_persisting` any holdovers from previous
# running instance.

if hs.config.worker.run_background_tasks:
# Add a background loop to periodically check if we should bump
# state epoch.
self._clock.looping_call_now(
self._advance_state_epoch, self.TIME_BETWEEN_EPOCH_INCREMENTS_MS / 5
)

@wrap_as_background_process("_advance_state_epoch")
async def _advance_state_epoch(self) -> None:
"""Advances the state epoch, checking that we haven't advanced it too
recently.
"""

now = self._clock.time_msec()
update_if_before_ts = now - self.TIME_BETWEEN_EPOCH_INCREMENTS_MS

def advance_state_epoch_txn(txn: LoggingTransaction) -> None:
sql = """
UPDATE state_epoch
SET state_epoch = state_epoch + 1, updated_ts = ?
WHERE updated_ts <= ?
"""
txn.execute(sql, (now, update_if_before_ts))

await self.db_pool.runInteraction(
"_advance_state_epoch", advance_state_epoch_txn, db_autocommit=True
)

async def check_state_groups_and_bump_deletion(
self, state_groups: AbstractSet[int]
) -> Collection[int]:
Expand Down Expand Up @@ -143,9 +113,10 @@ def _check_state_groups_and_bump_deletion_txn(
)
sql = f"""
UPDATE state_groups_pending_deletion
SET state_epoch = (SELECT state_epoch FROM state_epoch)
SET insertion_ts = ?
WHERE {clause}
"""
args.insert(0, self._clock.time_msec())

txn.execute(sql, args)

Expand Down Expand Up @@ -219,8 +190,7 @@ async def persisting_state_group_references(
def _mark_state_groups_as_used_txn(
self, txn: LoggingTransaction, state_groups: Set[int]
) -> None:
"""Marks the given state groups as used. Also checks that the given
state epoch is not too old."""
"""Marks the given state groups as used."""

existing_state_groups = self._get_existing_groups_with_lock(txn, state_groups)
missing_state_groups = state_groups - existing_state_groups
Expand Down Expand Up @@ -264,21 +234,14 @@ def get_state_groups_that_can_be_purged_txn(
"""
txn.execute(sql, args)

current_state_epoch = self.db_pool.simple_select_one_onecol_txn(
txn,
table="state_epoch",
retcol="state_epoch",
keyvalues={},
)

# Check the deletion status in the DB of the given state groups
clause, args = make_in_list_sql_clause(
self.db_pool.engine, column="state_group", iterable=state_groups
)

sql = f"""
SELECT state_group, state_epoch FROM (
SELECT state_group, state_epoch FROM state_groups_pending_deletion
SELECT state_group, insertion_ts FROM (
SELECT state_group, insertion_ts FROM state_groups_pending_deletion
UNION
SELECT state_group, null FROM state_groups_persisting
) AS s
Expand All @@ -288,15 +251,16 @@ def get_state_groups_that_can_be_purged_txn(
txn.execute(sql, args)

can_delete = set()
for state_group, state_epoch in txn:
if state_epoch is None:
# A null state epoch means that we are currently persisting
now = self._clock.time_msec()
for state_group, insertion_ts in txn:
if insertion_ts is None:
# A null insertion_ts means that we are currently persisting
# events that reference the state group, so we don't delete
# them.
continue

if current_state_epoch - state_epoch < self.NUMBER_EPOCHS_BEFORE_DELETION:
# Not enough state epochs have occurred to allow us to delete.
if now - insertion_ts < self.DELAY_BEFORE_DELETION_MS:
# Not enough time has elapsed to allow us to delete.
continue

can_delete.add(state_group)
Expand Down
22 changes: 13 additions & 9 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.state.epochs import StateEpochDataStore
from synapse.storage.databases.state.deletion import StateDeletionDataStore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,10 +87,10 @@ def __init__(
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
epoch_store: "StateEpochDataStore",
state_deletion_store: "StateDeletionDataStore",
):
super().__init__(database, db_conn, hs)
self._epoch_store = epoch_store
self._state_deletion_store = state_deletion_store

# Originally the state store used a single DictionaryCache to cache the
# event IDs for the state types in a given state group to avoid hammering
Expand Down Expand Up @@ -475,9 +475,11 @@ def insert_deltas_group_txn(
"""

# We need to check that the prev group isn't about to be deleted
is_missing = self._epoch_store._check_state_groups_and_bump_deletion_txn(
txn,
{prev_group},
is_missing = (
self._state_deletion_store._check_state_groups_and_bump_deletion_txn(
txn,
{prev_group},
)
)
if is_missing:
raise Exception(
Expand Down Expand Up @@ -609,9 +611,11 @@ def insert_delta_group_txn(
"""

# We need to check that the prev group isn't about to be deleted
is_missing = self._epoch_store._check_state_groups_and_bump_deletion_txn(
txn,
{prev_group},
is_missing = (
self._state_deletion_store._check_state_groups_and_bump_deletion_txn(
txn,
{prev_group},
)
)
if is_missing:
raise Exception(
Expand Down
Loading

0 comments on commit 989eaef

Please sign in to comment.