mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 22:13:26 +00:00
128 lines
4.9 KiB
Python
Executable File
128 lines
4.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Processes updates to postgres Full Text Search for new/edited messages.
|
|
#
|
|
# Zulip manages its postgres full-text search as follows. When the
|
|
# content of a message is modified, a postgres trigger logs the
|
|
# message ID to the `fts_update_log` table. In the background, this
|
|
# program processes `fts_update_log`, updating the postgres full-text
|
|
# search column search_tsvector in the main zerver_message.
|
|
import sys
|
|
|
|
# We want to use a virtualenv in production, which will be in /home/zulip/deployments/current.
|
|
# So we should add that path to sys.path and then import scripts.lib.setup_path_on_import.
|
|
# But this file is also used in development, where the above path will not exist.
|
|
# So `import scripts.lib.setup_path_on_import` will raise an ImportError.
|
|
# In development, we just want to skip this step since we know that virtualenv will already be in use.
|
|
# So catch the ImportError and do nothing.
|
|
sys.path.append('/home/zulip/deployments/current')
|
|
try:
|
|
import scripts.lib.setup_path_on_import
|
|
except ImportError:
|
|
pass
|
|
|
|
import psycopg2
|
|
import psycopg2.extensions
|
|
import select
|
|
import time
|
|
import logging
|
|
import configparser
|
|
import sys
|
|
import os
|
|
|
|
def update_fts_columns(cursor):
|
|
# type: (psycopg2.extensions.cursor) -> None
|
|
cursor.execute("SELECT id, message_id FROM fts_update_log;")
|
|
ids = []
|
|
for (id, message_id) in cursor.fetchall():
|
|
if USING_PGROONGA:
|
|
cursor.execute("UPDATE zerver_message SET "
|
|
"search_pgroonga = "
|
|
"subject || ' ' || rendered_content "
|
|
"WHERE id = %s", (message_id,))
|
|
cursor.execute("UPDATE zerver_message SET "
|
|
"search_tsvector = to_tsvector('zulip.english_us_search', "
|
|
"subject || rendered_content) "
|
|
"WHERE id = %s", (message_id,))
|
|
ids.append(id)
|
|
cursor.execute("DELETE FROM fts_update_log WHERE id = ANY(%s)", (ids,))
|
|
|
|
|
|
def am_master(cursor):
|
|
# type: (psycopg2.extensions.cursor) -> bool
|
|
cursor.execute("SELECT pg_is_in_recovery()")
|
|
return not cursor.fetchall()[0][0]
|
|
|
|
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
|
|
logger = logging.getLogger("process_fts_updates")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
logger.info("process_fts_updates starting")
|
|
|
|
# Path to the root of the Zulip codebase in production
|
|
sys.path.insert(0, '/home/zulip/deployments/current')
|
|
# Path to the root of the Zulip codebase in development
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../..")))
|
|
try:
|
|
os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.settings'
|
|
from django.conf import settings
|
|
remote_postgres_host = settings.REMOTE_POSTGRES_HOST
|
|
USING_PGROONGA = settings.USING_PGROONGA
|
|
except ImportError as e:
|
|
# process_fts_updates also supports running locally on a remote
|
|
# postgres server; in that case, one can just connect to localhost
|
|
remote_postgres_host = ''
|
|
USING_PGROONGA = False
|
|
|
|
# Since we don't want a hard dependency on being able to access the
|
|
# Zulip settings (as we may not be running on a server that has that
|
|
# data), we determine whether we're using pgroonga using
|
|
# /etc/zulip/zulip.conf.
|
|
#
|
|
# However, we still also check the `USING_PGROONGA` variable, since
|
|
# that's all we have in development.
|
|
config_file = configparser.RawConfigParser()
|
|
config_file.read("/etc/zulip/zulip.conf")
|
|
if config_file.has_option('machine', 'pgroonga'):
|
|
USING_PGROONGA = True
|
|
|
|
if remote_postgres_host != '':
|
|
postgres_password = ''
|
|
if settings.DATABASES['default']['PASSWORD'] is not None:
|
|
postgres_password = "password='%s'" % settings.DATABASES['default']['PASSWORD']
|
|
postgres_user = "user='%s'" % settings.DATABASES['default']['USER']
|
|
postgres_dbname = "dbname='%s'" % settings.DATABASES['default']['NAME']
|
|
if settings.REMOTE_POSTGRES_SSLMODE != '':
|
|
postgres_sslmode = settings.REMOTE_POSTGRES_SSLMODE
|
|
else:
|
|
postgres_sslmode = 'verify-full'
|
|
conn = psycopg2.connect("%s %s host='%s' %s connect_timeout=600 sslmode='%s'" % (
|
|
postgres_user, postgres_password, remote_postgres_host, postgres_dbname, postgres_sslmode))
|
|
else:
|
|
conn = psycopg2.connect("user=zulip")
|
|
cursor = conn.cursor()
|
|
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
|
|
first_check = True
|
|
while not am_master(cursor):
|
|
if first_check:
|
|
first_check = False
|
|
logger.info("In recovery; sleeping")
|
|
time.sleep(5)
|
|
|
|
logger.info("Not in recovery; listening for FTS updates")
|
|
|
|
cursor.execute("LISTEN fts_update_log;")
|
|
update_fts_columns(cursor)
|
|
|
|
# TODO: If we go back into recovery, we should stop processing updates
|
|
try:
|
|
while True:
|
|
if select.select([conn], [], [], 30) != ([], [], []):
|
|
conn.poll()
|
|
while conn.notifies:
|
|
conn.notifies.pop()
|
|
update_fts_columns(cursor)
|
|
except KeyboardInterrupt:
|
|
print(sys.argv[0], "exited after receiving KeyboardInterrupt")
|