Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bulk-invalidate e2e cached queries after claiming keys #16613

Merged
merged 15 commits into from
Nov 9, 2023
Prev Previous commit
Next Next commit
Mark multiple IDs as finished.
  • Loading branch information
clokep committed Nov 8, 2023
commit 307c77abead5210518622100c5e9a4c24a9a11e5
24 changes: 13 additions & 11 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,8 @@ def get_next_txn(self, txn: LoggingTransaction) -> int:

next_id = self._load_next_id_txn(txn)

txn.call_after(self._mark_id_as_finished, next_id)
txn.call_on_exception(self._mark_id_as_finished, next_id)
txn.call_after(self._mark_id_as_finished, [next_id])
txn.call_on_exception(self._mark_id_as_finished, [next_id])
txn.call_after(self._notifier.notify_replication)

# Update the `stream_positions` table with newly updated stream
Expand Down Expand Up @@ -686,9 +686,8 @@ def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> List[int]:

next_ids = self._load_next_mult_id_txn(txn, n)

for next_id in next_ids:
txn.call_after(self._mark_id_as_finished, next_id)
txn.call_on_exception(self._mark_id_as_finished, next_id)
txn.call_after(self._mark_ids_as_finished, next_ids)
txn.call_on_exception(self._mark_ids_as_finished, next_ids)
txn.call_after(self._notifier.notify_replication)

# Update the `stream_positions` table with newly updated stream
Expand All @@ -708,14 +707,15 @@ def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> List[int]:

return [self._return_factor * next_id for next_id in next_ids]

def _mark_id_as_finished(self, next_id: int) -> None:
def _mark_ids_as_finished(self, next_ids: List[int]) -> None:
"""The ID has finished being processed so we should advance the
current position if possible.
"""

with self._lock:
self._unfinished_ids.discard(next_id)
self._finished_ids.add(next_id)
self._unfinished_ids.difference_update(next_ids)
clokep marked this conversation as resolved.
Show resolved Hide resolved
for next_id in next_ids:
self._finished_ids.add(next_id)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

new_cur: Optional[int] = None

Expand Down Expand Up @@ -764,7 +764,10 @@ def _mark_id_as_finished(self, next_id: int) -> None:
curr, new_cur, self._max_position_of_local_instance
)

self._add_persisted_position(next_id)
# TODO Can we call this for just the last position or somehow batch
# _add_persisted_position.
Comment on lines +766 to +767
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a question for @erikjohnston -- _add_persisted_position seems fairly heavy and if we could optimize what we call it with (or take multiple IDs at once) I think that'd be a win.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we certainly could make make it take multiple IDs? It does expect to be called with every position (though I don't think anything would break, just be less efficient)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if it has to be called w/ every position maybe it isn't a huge deal...

for next_id in next_ids:
self._add_persisted_position(next_id)

def get_current_token(self) -> int:
return self.get_persisted_upto_position()
Expand Down Expand Up @@ -970,8 +973,7 @@ async def __aexit__(
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> bool:
for i in self.stream_ids:
self.id_gen._mark_id_as_finished(i)
self.id_gen._mark_ids_as_finished(self.stream_ids)

self.notifier.notify_replication()

Expand Down