#!/usr/bin/env python import psycopg2 import psycopg2.extensions import select import time import logging def update_fts_columns(cursor): cursor.execute("SELECT id, message_id FROM fts_update_log;") ids = [] for (id, message_id) in cursor.fetchall(): cursor.execute("UPDATE zerver_message SET " "search_tsvector = to_tsvector('zulip.english_us_search', " "subject || rendered_content) " "WHERE id = %s", (message_id,)) ids.append(id) cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,)) def am_master(cursor): cursor.execute("SELECT pg_is_in_recovery()") return not cursor.fetchall()[0][0] logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s") logger = logging.getLogger("process_fts_updates") logger.setLevel(logging.DEBUG) logger.info("process_fts_updates starting") conn = psycopg2.connect("user=zulip") cursor = conn.cursor() conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) first_check = True while not am_master(cursor): if first_check: first_check = False logger.info("In recovery; sleeping") time.sleep(5) logger.info("Not in recovery; listening for FTS updates") cursor.execute("LISTEN fts_update_log;") update_fts_columns(cursor) # TODO: If we go back into recovery, we should stop processing updates while True: if select.select([conn], [], [], 30) != ([], [], []): conn.poll() while conn.notifies: conn.notifies.pop() update_fts_columns(cursor)