mirror of
https://github.com/zulip/zulip.git
synced 2025-11-11 09:27:43 +00:00
process_fts_updates: After getting notifies, process them all.
Every insert into `fts_update_log` triggers a `NOTIFY`; processing a batch of 1000 for every NOTIFY that we get thus results in a lot of extra processing of no rows. Instead, we clear all notifies, and repeat the pattern of processing rows until the batch comes up short. This may still result in extra wake-ups, in the event that a new NOTIFY occurs after we clear them, and that new row is processed by us in `update_all_rows`. However, these wake-ups are necessary for correctness, and this change will result in drastically fewer extra checks.
This commit is contained in:
committed by
Tim Abbott
parent
fd898dafae
commit
eeb485f894
@@ -91,6 +91,23 @@ def update_fts_columns(conn: psycopg2.extensions.connection) -> int:
|
|||||||
return len(ids)
|
return len(ids)
|
||||||
|
|
||||||
|
|
||||||
|
def update_all_rows(msg: str, conn: psycopg2.extensions.connection) -> None:
|
||||||
|
while True:
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
rows_updated = update_fts_columns(conn)
|
||||||
|
if rows_updated:
|
||||||
|
logger.log(
|
||||||
|
logging.INFO,
|
||||||
|
"process_fts_updates: %s %d rows, %d rows/sec",
|
||||||
|
msg,
|
||||||
|
rows_updated,
|
||||||
|
rows_updated / (time.perf_counter() - start_time),
|
||||||
|
)
|
||||||
|
|
||||||
|
if rows_updated != BATCH_SIZE:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def am_master(conn: psycopg2.extensions.connection) -> bool:
|
def am_master(conn: psycopg2.extensions.connection) -> bool:
|
||||||
with conn.cursor() as cursor:
|
with conn.cursor() as cursor:
|
||||||
cursor.execute("SELECT pg_is_in_recovery()")
|
cursor.execute("SELECT pg_is_in_recovery()")
|
||||||
@@ -223,24 +240,14 @@ while True:
|
|||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
# Catch up on any historical columns
|
# Catch up on any historical columns
|
||||||
while True:
|
update_all_rows("Caught up", conn)
|
||||||
rows_updated = update_fts_columns(conn)
|
|
||||||
logger.log(
|
|
||||||
logging.INFO if rows_updated > 0 else logging.DEBUG,
|
|
||||||
"process_fts_updates: Processed %d rows catching up",
|
|
||||||
rows_updated,
|
|
||||||
)
|
|
||||||
|
|
||||||
if rows_updated != BATCH_SIZE:
|
|
||||||
# We're caught up, so proceed to the listening for updates phase.
|
|
||||||
break
|
|
||||||
|
|
||||||
# TODO: If we go back into recovery, we should stop processing updates
|
# TODO: If we go back into recovery, we should stop processing updates
|
||||||
if select.select([conn], [], [], 30) != ([], [], []):
|
if select.select([conn], [], [], 30) != ([], [], []):
|
||||||
conn.poll()
|
conn.poll()
|
||||||
while conn.notifies:
|
conn.notifies.clear()
|
||||||
conn.notifies.pop()
|
update_all_rows("Updated", conn)
|
||||||
update_fts_columns(cursor)
|
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
retries -= 1
|
retries -= 1
|
||||||
if retries <= 0:
|
if retries <= 0:
|
||||||
|
|||||||
Reference in New Issue
Block a user