mirror of
				https://github.com/zulip/docker-zulip.git
				synced 2025-11-04 05:53:19 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			54 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			54 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
import psycopg2
 | 
						|
import psycopg2.extensions
 | 
						|
import select
 | 
						|
import time
 | 
						|
import logging
 | 
						|
 | 
						|
def update_fts_columns(cursor):
 | 
						|
    cursor.execute("SELECT id, message_id FROM fts_update_log;")
 | 
						|
    ids = []
 | 
						|
    for (id, message_id) in cursor.fetchall():
 | 
						|
        cursor.execute("UPDATE zerver_message SET "
 | 
						|
                       "search_tsvector = to_tsvector('zulip.english_us_search', "
 | 
						|
                       "subject || rendered_content) "
 | 
						|
                       "WHERE id = %s", (message_id,))
 | 
						|
        ids.append(id)
 | 
						|
    cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
 | 
						|
 | 
						|
 | 
						|
def am_master(cursor):
 | 
						|
    cursor.execute("SELECT pg_is_in_recovery()")
 | 
						|
    return not cursor.fetchall()[0][0]
 | 
						|
 | 
						|
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
 | 
						|
logger = logging.getLogger("process_fts_updates")
 | 
						|
logger.setLevel(logging.DEBUG)
 | 
						|
 | 
						|
logger.info("process_fts_updates starting")
 | 
						|
 | 
						|
conn = psycopg2.connect("user=zulip")
 | 
						|
cursor = conn.cursor()
 | 
						|
 | 
						|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
 | 
						|
 | 
						|
first_check = True
 | 
						|
while not am_master(cursor):
 | 
						|
    if first_check:
 | 
						|
        first_check = False
 | 
						|
        logger.info("In recovery; sleeping")
 | 
						|
    time.sleep(5)
 | 
						|
 | 
						|
logger.info("Not in recovery; listening for FTS updates")
 | 
						|
 | 
						|
cursor.execute("LISTEN fts_update_log;")
 | 
						|
update_fts_columns(cursor)
 | 
						|
 | 
						|
# TODO: If we go back into recovery, we should stop processing updates
 | 
						|
while True:
 | 
						|
    if select.select([conn], [], [], 30) != ([], [], []):
 | 
						|
        conn.poll()
 | 
						|
        while conn.notifies:
 | 
						|
            conn.notifies.pop()
 | 
						|
            update_fts_columns(cursor)
 |