mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 22:13:26 +00:00
We run this tool at DEBUG log level in production, so we will still see the notice on startup there; this avoids a spammy line in the development environment output..
175 lines
6.2 KiB
Python
Executable File
175 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Processes updates to PostgreSQL full-text search for new/edited messages.
|
|
#
|
|
# Zulip manages its PostgreSQL full-text search as follows. When the
|
|
# content of a message is modified, a PostgreSQL trigger logs the
|
|
# message ID to the `fts_update_log` table. In the background, this
|
|
# program processes `fts_update_log`, updating the PostgreSQL full-text
|
|
# search column search_tsvector in the main zerver_message.
|
|
import sys
|
|
|
|
# We want to use a virtualenv in production, which will be in /home/zulip/deployments/current.
|
|
# So we should add that path to sys.path and then call setup_path.
|
|
# But this file is also used in development, where the above path will not exist.
|
|
# So `from scripts.lib.setup_path import setup_path` will raise an ImportError.
|
|
# In development, we just want to skip this step since we know that virtualenv will already be in use.
|
|
# So catch the ImportError and do nothing.
|
|
sys.path.append('/home/zulip/deployments/current')
|
|
try:
|
|
from scripts.lib.setup_path import setup_path
|
|
|
|
setup_path()
|
|
except ImportError:
|
|
pass
|
|
|
|
import argparse
|
|
import configparser
|
|
import logging
|
|
import os
|
|
import select
|
|
import sys
|
|
import time
|
|
|
|
import psycopg2
|
|
import psycopg2.extensions
|
|
|
|
BATCH_SIZE = 1000
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--quiet',
|
|
action='store_true')
|
|
options = parser.parse_args()
|
|
|
|
logging.Formatter.converter = time.gmtime
|
|
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
|
|
logger = logging.getLogger("process_fts_updates")
|
|
if options.quiet:
|
|
logger.setLevel(logging.INFO)
|
|
else:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
def update_fts_columns(cursor: psycopg2.extensions.cursor) -> int:
|
|
cursor.execute(
|
|
"SELECT id, message_id FROM fts_update_log LIMIT %s;",
|
|
[BATCH_SIZE],
|
|
)
|
|
ids = []
|
|
for (id, message_id) in cursor.fetchall():
|
|
if USING_PGROONGA:
|
|
cursor.execute("UPDATE zerver_message SET "
|
|
"search_pgroonga = "
|
|
"escape_html(subject) || ' ' || rendered_content "
|
|
"WHERE id = %s", (message_id,))
|
|
cursor.execute("UPDATE zerver_message SET "
|
|
"search_tsvector = to_tsvector('zulip.english_us_search', "
|
|
"subject || rendered_content) "
|
|
"WHERE id = %s", (message_id,))
|
|
ids.append(id)
|
|
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
|
|
return len(ids)
|
|
|
|
def am_master(cursor: psycopg2.extensions.cursor) -> bool:
|
|
cursor.execute("SELECT pg_is_in_recovery()")
|
|
return not cursor.fetchall()[0][0]
|
|
|
|
pg_args = {}
|
|
|
|
# Path to the root of the Zulip codebase in production
|
|
sys.path.insert(0, '/home/zulip/deployments/current')
|
|
# Path to the root of the Zulip codebase in development
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../..")))
|
|
try:
|
|
os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.settings'
|
|
from django.conf import settings
|
|
if settings.REMOTE_POSTGRES_HOST != '':
|
|
pg_args['host'] = settings.REMOTE_POSTGRES_HOST
|
|
if settings.REMOTE_POSTGRES_PORT != '':
|
|
pg_args['port'] = settings.REMOTE_POSTGRES_PORT
|
|
USING_PGROONGA = settings.USING_PGROONGA
|
|
except ImportError:
|
|
# process_fts_updates also supports running locally on a remote
|
|
# PostgreSQL server; in that case, one can just connect to localhost
|
|
USING_PGROONGA = False
|
|
|
|
# Since we don't want a hard dependency on being able to access the
|
|
# Zulip settings (as we may not be running on a server that has that
|
|
# data), we determine whether we're using PGroonga using
|
|
# /etc/zulip/zulip.conf.
|
|
#
|
|
# However, we still also check the `USING_PGROONGA` variable, since
|
|
# that's all we have in development.
|
|
config_file = configparser.RawConfigParser()
|
|
config_file.read("/etc/zulip/zulip.conf")
|
|
if config_file.has_option('machine', 'pgroonga'):
|
|
USING_PGROONGA = True
|
|
|
|
if 'host' in pg_args:
|
|
pg_args['password'] = ''
|
|
if settings.DATABASES['default']['PASSWORD'] is not None:
|
|
pg_args['password'] = settings.DATABASES['default']['PASSWORD']
|
|
pg_args['user'] = settings.DATABASES['default']['USER']
|
|
pg_args['dbname'] = settings.DATABASES['default']['NAME']
|
|
if settings.REMOTE_POSTGRES_SSLMODE != '':
|
|
pg_args['sslmode'] = settings.REMOTE_POSTGRES_SSLMODE
|
|
else:
|
|
pg_args['sslmode'] = 'verify-full'
|
|
pg_args['connect_timeout'] = '600'
|
|
else:
|
|
pg_args['user'] = 'zulip'
|
|
|
|
conn = None
|
|
|
|
retries = 1
|
|
|
|
while True:
|
|
try:
|
|
if conn is None:
|
|
conn = psycopg2.connect(**pg_args)
|
|
cursor = conn.cursor()
|
|
retries = 30
|
|
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
|
|
first_check = True
|
|
while not am_master(cursor):
|
|
if first_check:
|
|
first_check = False
|
|
logger.warning("In recovery; sleeping")
|
|
time.sleep(5)
|
|
|
|
logger.debug("process_fts_updates: listening for search index updates")
|
|
|
|
cursor.execute("LISTEN fts_update_log;")
|
|
# Catch up on any historical columns
|
|
while True:
|
|
rows_updated = update_fts_columns(cursor)
|
|
notice = f"process_fts_updates: Processed {rows_updated} rows catching up"
|
|
if rows_updated > 0:
|
|
logger.info(notice)
|
|
else:
|
|
logger.debug(notice)
|
|
|
|
if rows_updated != BATCH_SIZE:
|
|
# We're caught up, so proceed to the listening for updates phase.
|
|
break
|
|
|
|
# TODO: If we go back into recovery, we should stop processing updates
|
|
if select.select([conn], [], [], 30) != ([], [], []):
|
|
conn.poll()
|
|
while conn.notifies:
|
|
conn.notifies.pop()
|
|
update_fts_columns(cursor)
|
|
except psycopg2.OperationalError as e:
|
|
retries -= 1
|
|
if retries <= 0:
|
|
raise
|
|
logger.info(e.pgerror, exc_info=True)
|
|
logger.info("Sleeping and reconnecting")
|
|
time.sleep(5)
|
|
if conn is not None:
|
|
conn.close()
|
|
conn = None
|
|
except KeyboardInterrupt:
|
|
print(sys.argv[0], "exited after receiving KeyboardInterrupt")
|
|
break
|