diff --git a/zerver/tests/test_message_topics.py b/zerver/tests/test_message_topics.py index a9f0b714b2..a054595203 100644 --- a/zerver/tests/test_message_topics.py +++ b/zerver/tests/test_message_topics.py @@ -4,8 +4,6 @@ from django.utils.timezone import now as timezone_now from zerver.actions.streams import do_change_stream_permission from zerver.lib.test_classes import ZulipTestCase -from zerver.lib.test_helpers import timeout_mock -from zerver.lib.timeout import TimeoutExpiredError from zerver.models import Message, UserMessage from zerver.models.clients import get_client from zerver.models.realms import get_realm @@ -291,26 +289,24 @@ class TopicDeleteTest(ZulipTestCase): acting_user=user_profile, ) # Delete the topic should now remove all messages - with timeout_mock("zerver.views.streams"): - result = self.client_post( - endpoint, - { - "topic_name": topic_name, - }, - ) + result = self.client_post( + endpoint, + { + "topic_name": topic_name, + }, + ) result_dict = self.assert_json_success(result) self.assertTrue(result_dict["complete"]) self.assertFalse(Message.objects.filter(id=last_msg_id).exists()) self.assertTrue(Message.objects.filter(id=initial_last_msg_id).exists()) # Delete again, to test the edge case of deleting an empty topic. - with timeout_mock("zerver.views.streams"): - result = self.client_post( - endpoint, - { - "topic_name": topic_name, - }, - ) + result = self.client_post( + endpoint, + { + "topic_name": topic_name, + }, + ) result_dict = self.assert_json_success(result) self.assertTrue(result_dict["complete"]) self.assertFalse(Message.objects.filter(id=last_msg_id).exists()) @@ -328,7 +324,7 @@ class TopicDeleteTest(ZulipTestCase): self.login_user(user_profile) endpoint = "/json/streams/" + str(stream.id) + "/delete_topic" - with mock.patch("zerver.views.streams.timeout", side_effect=TimeoutExpiredError): + with mock.patch("time.monotonic", side_effect=[10000, 10051]): result = self.client_post( endpoint, { diff --git a/zerver/views/streams.py b/zerver/views/streams.py index c51b85254f..7bde4904e0 100644 --- a/zerver/views/streams.py +++ b/zerver/views/streams.py @@ -1,5 +1,6 @@ +import time from collections import defaultdict -from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Sequence, Set, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Union import orjson from django.conf import settings @@ -76,7 +77,6 @@ from zerver.lib.streams import ( ) from zerver.lib.string_validation import check_stream_name from zerver.lib.subscription_info import gather_subscriptions -from zerver.lib.timeout import TimeoutExpiredError, timeout from zerver.lib.topic import ( get_topic_history_for_public_stream, get_topic_history_for_stream, @@ -930,29 +930,23 @@ def delete_in_topic( # the user can see are returned in the query. messages = bulk_access_stream_messages_query(user_profile, messages, stream) - def delete_in_batches() -> Literal[True]: - # Topics can be large enough that this request will inevitably time out. - # In such a case, it's good for some progress to be accomplished, so that - # full deletion can be achieved by repeating the request. For that purpose, - # we delete messages in atomic batches, committing after each batch. - # TODO: Ideally this should be moved to the deferred_work queue. - batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE - while True: - with transaction.atomic(durable=True): - messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update( - of=("self",) - ) - if not messages_to_delete: - break - do_delete_messages(user_profile.realm, messages_to_delete) - - # timeout() in which we call this function requires non-None return value. - return True - - try: - timeout(50, delete_in_batches) - except TimeoutExpiredError: - return json_success(request, data={"complete": False}) + # Topics can be large enough that this request will inevitably time out. + # In such a case, it's good for some progress to be accomplished, so that + # full deletion can be achieved by repeating the request. For that purpose, + # we delete messages in atomic batches, committing after each batch. + # TODO: Ideally this should be moved to the deferred_work queue. + start_time = time.monotonic() + batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE + while True: + if time.monotonic() >= start_time + 50: + return json_success(request, data={"complete": False}) + with transaction.atomic(durable=True): + messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update( + of=("self",) + ) + if not messages_to_delete: + break + do_delete_messages(user_profile.realm, messages_to_delete) return json_success(request, data={"complete": True})