mirror of
https://github.com/zulip/zulip.git
synced 2025-11-18 12:54:58 +00:00
topic: Use a single SQL statement to propagate message moves.
Rather than use `bulk_update()` to batch-move chunks of messages, use
a single SQL query to move the messages. This is much more efficient
for large topic moves. Since the `edit_history` field is not yet
JSON (see #26496) this requires that PostgreSQL cast the current data
into `jsonb`, append the new data (also cast to `jsonb`), and then
re-cast that as text.
For single-message moves, this _increases_ the SQL query count by one,
since we have to re-query for the updated data from the database after
the bulk update. However, this is overall still a performance
improvement, which improves to 2x or 3x for larger topic moves. Below
is a table of duration in seconds to run `do_update_message` to move a
topic to a new stream, based on messages in the topic, for before and
after this change:
| Topic size | Before | After |
| ---------- | -------- | ------- |
| 1 | 0.1036 | 0.0868 |
| 2 | 0.1108 | 0.0925 |
| 5 | 0.1139 | 0.0959 |
| 10 | 0.1218 | 0.0972 |
| 20 | 0.1310 | 0.1098 |
| 50 | 0.1759 | 0.1366 |
| 100 | 0.2307 | 0.1662 |
| 200 | 0.3880 | 0.2229 |
| 500 | 0.7676 | 0.4052 |
| 1000 | 1.3990 | 0.6848 |
| 2000 | 2.9706 | 1.3370 |
| 5000 | 7.5218 | 3.2882 |
| 10000 | 14.0272 | 5.4434 |
(cherry picked from commit a2657b843c)
This commit is contained in:
committed by
Tim Abbott
parent
9a2a5b5910
commit
210c9aaf1c
@@ -3,12 +3,14 @@ from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
import orjson
|
||||
from django.db import connection
|
||||
from django.db.models import Q, QuerySet, Subquery
|
||||
from django.db.models import F, Func, JSONField, Q, QuerySet, Subquery, TextField, Value
|
||||
from django.db.models.functions import Cast
|
||||
from sqlalchemy.sql import ColumnElement, column, func, literal
|
||||
from sqlalchemy.types import Boolean, Text
|
||||
|
||||
from zerver.lib.request import REQ
|
||||
from zerver.lib.types import EditHistoryEvent
|
||||
from zerver.lib.utils import assert_is_not_none
|
||||
from zerver.models import Message, Reaction, Stream, UserMessage, UserProfile
|
||||
|
||||
# Only use these constants for events.
|
||||
@@ -153,21 +155,16 @@ def update_messages_for_topic_edit(
|
||||
edit_history_event: EditHistoryEvent,
|
||||
last_edit_time: datetime,
|
||||
) -> List[Message]:
|
||||
propagate_query = Q(
|
||||
recipient_id=old_stream.recipient_id,
|
||||
# Uses index: zerver_message_realm_recipient_upper_subject
|
||||
messages = Message.objects.filter(
|
||||
realm_id=old_stream.realm_id,
|
||||
recipient_id=assert_is_not_none(old_stream.recipient_id),
|
||||
subject__iexact=orig_topic_name,
|
||||
)
|
||||
if propagate_mode == "change_all":
|
||||
propagate_query = propagate_query & ~Q(id=edited_message.id)
|
||||
messages = messages.exclude(id=edited_message.id)
|
||||
if propagate_mode == "change_later":
|
||||
propagate_query = propagate_query & Q(id__gt=edited_message.id)
|
||||
|
||||
# Uses index: zerver_message_realm_recipient_upper_subject
|
||||
messages = Message.objects.filter(propagate_query, realm_id=old_stream.realm_id).select_related(
|
||||
*Message.DEFAULT_SELECT_RELATED
|
||||
)
|
||||
|
||||
update_fields = ["edit_history", "last_edit_time"]
|
||||
messages = messages.filter(id__gt=edited_message.id)
|
||||
|
||||
if new_stream is not None:
|
||||
# If we're moving the messages between streams, only move
|
||||
@@ -175,33 +172,62 @@ def update_messages_for_topic_edit(
|
||||
# gain access to messages through moving them.
|
||||
from zerver.lib.message import bulk_access_stream_messages_query
|
||||
|
||||
messages_list = list(bulk_access_stream_messages_query(acting_user, messages, old_stream))
|
||||
messages = bulk_access_stream_messages_query(acting_user, messages, old_stream)
|
||||
else:
|
||||
# For single-message edits or topic moves within a stream, we
|
||||
# allow moving history the user may not have access in order
|
||||
# to keep topics together.
|
||||
messages_list = list(messages)
|
||||
pass
|
||||
|
||||
# The cached ORM objects are not changed by the upcoming
|
||||
# messages.update(), and the remote cache update (done by the
|
||||
# caller) requires the new value, so we manually update the
|
||||
# objects in addition to sending a bulk query to the database.
|
||||
update_fields: Dict[str, object] = {
|
||||
"last_edit_time": last_edit_time,
|
||||
# We cast the `edit_history` column to jsonb (defaulting NULL
|
||||
# to `[]`), apply the `||` array concatenation operator to it,
|
||||
# and cast the result back to text. See #26496 for making
|
||||
# this column itself jsonb, which is a complicated migration.
|
||||
#
|
||||
# This equates to:
|
||||
# "edit_history" = (
|
||||
# (COALESCE("zerver_message"."edit_history", '[]'))::jsonb
|
||||
# ||
|
||||
# ( '[{ ..json event.. }]' )::jsonb
|
||||
# )::text
|
||||
"edit_history": Cast(
|
||||
Func(
|
||||
Cast(
|
||||
Func(
|
||||
F("edit_history"),
|
||||
Value("[]"),
|
||||
function="COALESCE",
|
||||
),
|
||||
JSONField(),
|
||||
),
|
||||
Cast(
|
||||
Value(orjson.dumps([edit_history_event]).decode()),
|
||||
JSONField(),
|
||||
),
|
||||
function="",
|
||||
arg_joiner=" || ",
|
||||
),
|
||||
TextField(),
|
||||
),
|
||||
}
|
||||
if new_stream is not None:
|
||||
update_fields.append("recipient")
|
||||
for m in messages_list:
|
||||
assert new_stream.recipient is not None
|
||||
m.recipient = new_stream.recipient
|
||||
update_fields["recipient"] = new_stream.recipient
|
||||
if topic_name is not None:
|
||||
update_fields.append("subject")
|
||||
for m in messages_list:
|
||||
m.set_topic_name(topic_name)
|
||||
update_fields["subject"] = topic_name
|
||||
|
||||
for message in messages_list:
|
||||
update_edit_history(message, last_edit_time, edit_history_event)
|
||||
# The update will cause the 'messages' query to no longer match
|
||||
# any rows; we capture the set of matching ids first, do the
|
||||
# update, and then return a fresh collection -- so we know their
|
||||
# metadata has been updated for the UPDATE command, and the caller
|
||||
# can update the remote cache with that.
|
||||
message_ids = list(messages.values_list("id", flat=True))
|
||||
messages.update(**update_fields)
|
||||
|
||||
Message.objects.bulk_update(messages_list, update_fields, batch_size=100)
|
||||
|
||||
return messages_list
|
||||
return list(
|
||||
Message.objects.filter(id__in=message_ids).select_related(*Message.DEFAULT_SELECT_RELATED)
|
||||
)
|
||||
|
||||
|
||||
def generate_topic_history_from_db_rows(rows: List[Tuple[str, int]]) -> List[Dict[str, Any]]:
|
||||
|
||||
Reference in New Issue
Block a user