From fd898dafae9dbd79ac3a9748404d6a043083cee8 Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Fri, 16 May 2025 14:05:17 +0000 Subject: [PATCH] process_fts_updates: Use row locking and transactions. --- .../files/postgresql/process_fts_updates | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/puppet/zulip/files/postgresql/process_fts_updates b/puppet/zulip/files/postgresql/process_fts_updates index c80834019b..f8eecfbb18 100755 --- a/puppet/zulip/files/postgresql/process_fts_updates +++ b/puppet/zulip/files/postgresql/process_fts_updates @@ -62,36 +62,39 @@ else: logger.setLevel(logging.DEBUG) -def update_fts_columns(cursor: psycopg2.extensions.cursor) -> int: - cursor.execute( - "SELECT id, message_id FROM fts_update_log LIMIT %s;", - [BATCH_SIZE], - ) - ids = [] - for id, message_id in cursor.fetchall(): - if USING_PGROONGA: +def update_fts_columns(conn: psycopg2.extensions.connection) -> int: + with conn.cursor() as cursor: + cursor.execute( + "SELECT id, message_id FROM fts_update_log ORDER BY id LIMIT %s FOR UPDATE SKIP LOCKED;", + [BATCH_SIZE], + ) + ids = [] + for id, message_id in cursor.fetchall(): + if USING_PGROONGA: + cursor.execute( + "UPDATE zerver_message SET " + "search_pgroonga = " + "escape_html(subject) || ' ' || rendered_content " + "WHERE id = %s", + (message_id,), + ) cursor.execute( "UPDATE zerver_message SET " - "search_pgroonga = " - "escape_html(subject) || ' ' || rendered_content " + "search_tsvector = to_tsvector('zulip.english_us_search', " + "subject || rendered_content) " "WHERE id = %s", (message_id,), ) - cursor.execute( - "UPDATE zerver_message SET " - "search_tsvector = to_tsvector('zulip.english_us_search', " - "subject || rendered_content) " - "WHERE id = %s", - (message_id,), - ) - ids.append(id) - cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,)) - return len(ids) + ids.append(id) + cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,)) + conn.commit() + return len(ids) -def am_master(cursor: psycopg2.extensions.cursor) -> bool: - cursor.execute("SELECT pg_is_in_recovery()") - return not cursor.fetchall()[0][0] +def am_master(conn: psycopg2.extensions.connection) -> bool: + with conn.cursor() as cursor: + cursor.execute("SELECT pg_is_in_recovery()") + return not cursor.fetchall()[0][0] def get_config( @@ -201,13 +204,13 @@ while True: if conn is None: # connection_factory=None lets mypy understand the return type conn = psycopg2.connect(connection_factory=None, **pg_args) - cursor = conn.cursor() retries = 30 - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + conn.autocommit = False + conn.isolation_level = psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED first_check = True - while not am_master(cursor): + while not am_master(conn): if first_check: first_check = False logger.warning("In recovery; sleeping") @@ -215,10 +218,13 @@ while True: logger.debug("process_fts_updates: listening for search index updates") - cursor.execute("LISTEN fts_update_log;") + with conn.cursor() as cursor: + cursor.execute("LISTEN fts_update_log;") + conn.commit() + # Catch up on any historical columns while True: - rows_updated = update_fts_columns(cursor) + rows_updated = update_fts_columns(conn) logger.log( logging.INFO if rows_updated > 0 else logging.DEBUG, "process_fts_updates: Processed %d rows catching up",