process_fts_updates: Use row locking and transactions.

This commit is contained in:
Alex Vandiver
2025-05-16 14:05:17 +00:00
committed by Tim Abbott
parent 4d4a73c492
commit fd898dafae

View File

@@ -62,9 +62,10 @@ else:
logger.setLevel(logging.DEBUG)
def update_fts_columns(cursor: psycopg2.extensions.cursor) -> int:
def update_fts_columns(conn: psycopg2.extensions.connection) -> int:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, message_id FROM fts_update_log LIMIT %s;",
"SELECT id, message_id FROM fts_update_log ORDER BY id LIMIT %s FOR UPDATE SKIP LOCKED;",
[BATCH_SIZE],
)
ids = []
@@ -86,10 +87,12 @@ def update_fts_columns(cursor: psycopg2.extensions.cursor) -> int:
)
ids.append(id)
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
conn.commit()
return len(ids)
def am_master(cursor: psycopg2.extensions.cursor) -> bool:
def am_master(conn: psycopg2.extensions.connection) -> bool:
with conn.cursor() as cursor:
cursor.execute("SELECT pg_is_in_recovery()")
return not cursor.fetchall()[0][0]
@@ -201,13 +204,13 @@ while True:
if conn is None:
# connection_factory=None lets mypy understand the return type
conn = psycopg2.connect(connection_factory=None, **pg_args)
cursor = conn.cursor()
retries = 30
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = False
conn.isolation_level = psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
first_check = True
while not am_master(cursor):
while not am_master(conn):
if first_check:
first_check = False
logger.warning("In recovery; sleeping")
@@ -215,10 +218,13 @@ while True:
logger.debug("process_fts_updates: listening for search index updates")
with conn.cursor() as cursor:
cursor.execute("LISTEN fts_update_log;")
conn.commit()
# Catch up on any historical columns
while True:
rows_updated = update_fts_columns(cursor)
rows_updated = update_fts_columns(conn)
logger.log(
logging.INFO if rows_updated > 0 else logging.DEBUG,
"process_fts_updates: Processed %d rows catching up",