process_fts_updates: Use 3 queries, instead of BATCH_SIZE + 2.

This provides at least a 10x speedup.

We could use a CTE to fetch from, and lock, `fts_update_log` at the
same time as performing the `zerver_message` update -- however, this
would mean that the number of returned rows would be less than
BATCH_SIZE if some rows in `zerver_message` no longer existed, causing
premature termination of the outer loop.

We lock the `zerver_message` rows in ascending ID order, to prevent
deadlocks.  We use `ctid` values[^1] to transfer the rows from the
inner query to the outer update; this is safe, as the rows are locked
from updates.

[^1]: https://www.postgresql.org/docs/current/ddl-system-columns.html#DDL-SYSTEM-COLUMNS-CTID
This commit is contained in:
Alex Vandiver
2025-05-16 14:36:57 +00:00
committed by Tim Abbott
parent f2f94d30fe
commit 09f1195fb5

View File

@@ -41,6 +41,7 @@ import os
import select
import sys
import time
from collections.abc import Sequence
import psycopg2
import psycopg2.extensions
@@ -65,31 +66,35 @@ else:
def update_fts_columns(conn: psycopg2.extensions.connection) -> int:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, message_id FROM fts_update_log ORDER BY id LIMIT %s FOR UPDATE SKIP LOCKED;",
[BATCH_SIZE],
"SELECT id, message_id FROM fts_update_log ORDER BY id LIMIT %s FOR UPDATE SKIP LOCKED",
(BATCH_SIZE,),
)
ids = []
for id, message_id in cursor.fetchall():
cursor.arraysize = BATCH_SIZE
parts = list(zip(*cursor.fetchmany(), strict=True))
if not parts:
row_ids: Sequence[int] = []
message_ids: Sequence[int] = []
else:
row_ids, message_ids = parts[0], parts[1]
if message_ids:
if USING_PGROONGA:
cursor.execute(
"UPDATE zerver_message SET "
"search_pgroonga = "
"escape_html(subject) || ' ' || rendered_content "
"WHERE id = %s",
(message_id,),
)
update_sql = "search_pgroonga = escape_html(subject) || ' ' || rendered_content"
else:
update_sql = "search_tsvector = to_tsvector('zulip.english_us_search', subject || rendered_content)"
cursor.execute(
"UPDATE zerver_message SET "
"search_tsvector = to_tsvector('zulip.english_us_search', "
"subject || rendered_content) "
"WHERE id = %s",
(message_id,),
f"UPDATE zerver_message SET {update_sql} " # noqa: S608
"WHERE ctid IN ("
" SELECT ctid FROM zerver_message"
" WHERE id IN %s"
" ORDER BY id FOR UPDATE"
")",
(message_ids,),
)
ids.append(id)
if ids:
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
if row_ids:
cursor.execute("DELETE FROM fts_update_log WHERE id IN %s", (row_ids,))
conn.commit()
return len(ids)
return len(row_ids)
def update_all_rows(msg: str, conn: psycopg2.extensions.connection) -> None: