queue processor tests: Call consume by default.

This significantly improves the API for queue_json_publish to not be
overly focused on what the behavior of this function should be in our
unit tests.
This commit is contained in:
Robert Hönig
2017-11-24 13:18:46 +01:00
committed by Tim Abbott
parent 5e23bfa779
commit 0e0a8a2b14
12 changed files with 22 additions and 38 deletions

View File

@@ -1231,7 +1231,7 @@ def do_send_messages(messages_maybe_none, email_gateway=False):
'message_content': message['message'].content, 'message_content': message['message'].content,
'message_realm_id': message['realm'].id, 'message_realm_id': message['realm'].id,
'urls': links_for_embed} 'urls': links_for_embed}
queue_json_publish('embed_links', event_data, lambda x: None, call_consume_in_tests=True) queue_json_publish('embed_links', event_data)
if (settings.ENABLE_FEEDBACK and settings.FEEDBACK_BOT and if (settings.ENABLE_FEEDBACK and settings.FEEDBACK_BOT and
message['message'].recipient.type == Recipient.PERSONAL): message['message'].recipient.type == Recipient.PERSONAL):
@@ -1258,9 +1258,7 @@ def do_send_messages(messages_maybe_none, email_gateway=False):
"message": wide_message_dict, "message": wide_message_dict,
"trigger": event['trigger'], "trigger": event['trigger'],
"user_profile_id": event["user_profile_id"], "user_profile_id": event["user_profile_id"],
}, }
lambda x: None,
call_consume_in_tests=True
) )
# Note that this does not preserve the order of message ids # Note that this does not preserve the order of message ids
@@ -2446,7 +2444,7 @@ def bulk_remove_subscriptions(users, streams, acting_user=None):
event = {'type': 'mark_stream_messages_as_read', event = {'type': 'mark_stream_messages_as_read',
'user_profile_id': user_profile.id, 'user_profile_id': user_profile.id,
'stream_ids': [stream.id for stream in streams]} 'stream_ids': [stream.id for stream in streams]}
queue_json_publish("deferred_work", event, lambda x: None, call_consume_in_tests=True) queue_json_publish("deferred_work", event)
all_subscribers_by_stream = get_user_ids_for_streams(streams=streams) all_subscribers_by_stream = get_user_ids_for_streams(streams=streams)
@@ -3187,7 +3185,7 @@ def update_user_activity_interval(user_profile, log_time):
# type: (UserProfile, datetime.datetime) -> None # type: (UserProfile, datetime.datetime) -> None
event = {'user_profile_id': user_profile.id, event = {'user_profile_id': user_profile.id,
'time': datetime_to_timestamp(log_time)} 'time': datetime_to_timestamp(log_time)}
queue_json_publish("user_activity_interval", event, lambda x: None, call_consume_in_tests=True) queue_json_publish("user_activity_interval", event)
def update_user_presence(user_profile, client, log_time, status, def update_user_presence(user_profile, client, log_time, status,
new_user_input): new_user_input):
@@ -3197,7 +3195,7 @@ def update_user_presence(user_profile, client, log_time, status,
'time': datetime_to_timestamp(log_time), 'time': datetime_to_timestamp(log_time),
'client': client.name} 'client': client.name}
queue_json_publish("user_presence", event, lambda x: None, call_consume_in_tests=True) queue_json_publish("user_presence", event)
if new_user_input: if new_user_input:
update_user_activity_interval(user_profile, log_time) update_user_activity_interval(user_profile, log_time)
@@ -3989,9 +3987,7 @@ def do_invite_users(user_profile, invitee_emails, streams, invite_as_admin=False
prereg_user.streams.set(stream_ids) prereg_user.streams.set(stream_ids)
event = {"email": prereg_user.email, "referrer_id": user_profile.id, "email_body": body} event = {"email": prereg_user.email, "referrer_id": user_profile.id, "email_body": body}
queue_json_publish("invites", event, queue_json_publish("invites", event)
lambda event: None,
call_consume_in_tests=True)
if skipped: if skipped:
raise InvitationError(_("Some of those addresses are already using Zulip, " raise InvitationError(_("Some of those addresses are already using Zulip, "

View File

@@ -56,7 +56,7 @@ def queue_digest_recipient(user_profile: UserProfile, cutoff: datetime.datetime)
# Convert cutoff to epoch seconds for transit. # Convert cutoff to epoch seconds for transit.
event = {"user_profile_id": user_profile.id, event = {"user_profile_id": user_profile.id,
"cutoff": cutoff.strftime('%s')} "cutoff": cutoff.strftime('%s')}
queue_json_publish("digest_emails", event, lambda event: None, call_consume_in_tests=True) queue_json_publish("digest_emails", event)
def enqueue_emails(cutoff: datetime.datetime) -> None: def enqueue_emails(cutoff: datetime.datetime) -> None:
# To be really conservative while we don't have user timezones or # To be really conservative while we don't have user timezones or

View File

@@ -361,8 +361,6 @@ def mirror_email_message(data: Dict[Text, Text]) -> Dict[str, str]:
{ {
"message": data['msg_text'], "message": data['msg_text'],
"rcpt_to": rcpt_to "rcpt_to": rcpt_to
}, }
lambda x: None,
call_consume_in_tests=True
) )
return {"status": "success"} return {"status": "success"}

View File

@@ -380,8 +380,7 @@ def do_send_missedmessage_events_reply_in_zulip(user_profile, missed_messages, m
'from_address': from_address, 'from_address': from_address,
'reply_to_email': formataddr((reply_to_name, reply_to_address)), 'reply_to_email': formataddr((reply_to_name, reply_to_address)),
'context': context} 'context': context}
queue_json_publish("missedmessage_email_senders", email_dict, send_email_from_dict, queue_json_publish("missedmessage_email_senders", email_dict, send_email_from_dict)
call_consume_in_tests=True)
user_profile.last_reminder = timezone_now() user_profile.last_reminder = timezone_now()
user_profile.save(update_fields=['last_reminder']) user_profile.save(update_fields=['last_reminder'])

View File

@@ -277,18 +277,17 @@ queue_lock = threading.RLock()
def queue_json_publish(queue_name: str, def queue_json_publish(queue_name: str,
event: Union[Dict[str, Any], str], event: Union[Dict[str, Any], str],
processor: Callable[[Any], None], processor: Callable[[Any], None]=None) -> None:
call_consume_in_tests: bool=False) -> None:
# most events are dicts, but zerver.middleware.write_log_line uses a str # most events are dicts, but zerver.middleware.write_log_line uses a str
with queue_lock: with queue_lock:
if settings.USING_RABBITMQ: if settings.USING_RABBITMQ:
get_queue_client().json_publish(queue_name, event) get_queue_client().json_publish(queue_name, event)
elif call_consume_in_tests: elif processor:
processor(event)
else:
# Must be imported here: A top section import leads to obscure not-defined-ish errors. # Must be imported here: A top section import leads to obscure not-defined-ish errors.
from zerver.worker.queue_processors import get_worker from zerver.worker.queue_processors import get_worker
get_worker(queue_name).consume_wrapper(event) # type: ignore # https://github.com/python/mypy/issues/3360 get_worker(queue_name).consume_wrapper(event) # type: ignore # https://github.com/python/mypy/issues/3360
else:
processor(event)
def retry_event(queue_name: str, def retry_event(queue_name: str,
event: Dict[str, Any], event: Dict[str, Any],

View File

@@ -106,7 +106,7 @@ class AdminZulipHandler(logging.Handler):
queue_json_publish('error_reports', dict( queue_json_publish('error_reports', dict(
type = "server", type = "server",
report = report, report = report,
), lambda x: None, call_consume_in_tests=True) ))
except Exception: except Exception:
# If this breaks, complain loudly but don't pass the traceback up the stream # If this breaks, complain loudly but don't pass the traceback up the stream
# However, we *don't* want to use logging.exception since that could trigger a loop. # However, we *don't* want to use logging.exception since that could trigger a loop.

View File

@@ -206,10 +206,7 @@ def write_log_line(log_data, path, method, remote_ip, email, client_name,
logger.info(logger_line) logger.info(logger_line)
if (is_slow_query(time_delta, path)): if (is_slow_query(time_delta, path)):
# Since the slow query worker patches code, we can't directly queue_json_publish("slow_queries", "%s (%s)" % (logger_line, email))
# use call_consume_in_tests here without further work.
queue_json_publish("slow_queries", "%s (%s)" % (logger_line, email), lambda e: None,
call_consume_in_tests=True)
if settings.PROFILE_ALL_REQUESTS: if settings.PROFILE_ALL_REQUESTS:
log_data["prof"].disable() log_data["prof"].disable()

View File

@@ -540,8 +540,7 @@ class TestEmailMirrorTornadoView(ZulipTestCase):
def check_queue_json_publish(queue_name: str, def check_queue_json_publish(queue_name: str,
event: Union[Mapping[str, Any], str], event: Union[Mapping[str, Any], str],
processor: Callable[[Any], None], processor: Callable[[Any], None]=None) -> None:
call_consume_in_tests: bool) -> None:
self.assertEqual(queue_name, "email_mirror") self.assertEqual(queue_name, "email_mirror")
self.assertEqual(event, {"rcpt_to": to_address, "message": mail}) self.assertEqual(event, {"rcpt_to": to_address, "message": mail})

View File

@@ -322,8 +322,7 @@ class TestServiceBotEventTriggers(ZulipTestCase):
def check_values_passed(queue_name: Any, def check_values_passed(queue_name: Any,
trigger_event: Union[Mapping[Any, Any], Any], trigger_event: Union[Mapping[Any, Any], Any],
x: Callable[[Any], None], x: Callable[[Any], None]=None) -> None:
call_consume_in_tests: bool) -> None:
self.assertEqual(queue_name, expected_queue_name) self.assertEqual(queue_name, expected_queue_name)
self.assertEqual(trigger_event["message"]["content"], content) self.assertEqual(trigger_event["message"]["content"], content)
self.assertEqual(trigger_event["message"]["display_recipient"], recipient) self.assertEqual(trigger_event["message"]["display_recipient"], recipient)
@@ -368,8 +367,7 @@ class TestServiceBotEventTriggers(ZulipTestCase):
def check_values_passed(queue_name: Any, def check_values_passed(queue_name: Any,
trigger_event: Union[Mapping[Any, Any], Any], trigger_event: Union[Mapping[Any, Any], Any],
x: Callable[[Any], None], x: Callable[[Any], None]=None) -> None:
call_consume_in_tests: bool) -> None:
self.assertEqual(queue_name, expected_queue_name) self.assertEqual(queue_name, expected_queue_name)
self.assertEqual(trigger_event["user_profile_id"], self.bot_profile.id) self.assertEqual(trigger_event["user_profile_id"], self.bot_profile.id)
self.assertEqual(trigger_event["trigger"], "private_message") self.assertEqual(trigger_event["trigger"], "private_message")
@@ -411,8 +409,7 @@ class TestServiceBotEventTriggers(ZulipTestCase):
def check_values_passed(queue_name: Any, def check_values_passed(queue_name: Any,
trigger_event: Union[Mapping[Any, Any], Any], trigger_event: Union[Mapping[Any, Any], Any],
x: Callable[[Any], None], x: Callable[[Any], None]=None) -> None:
call_consume_in_tests: bool) -> None:
self.assertEqual(queue_name, expected_queue_name) self.assertEqual(queue_name, expected_queue_name)
self.assertIn(trigger_event["user_profile_id"], profile_ids) self.assertIn(trigger_event["user_profile_id"], profile_ids)
profile_ids.remove(trigger_event["user_profile_id"]) profile_ids.remove(trigger_event["user_profile_id"])

View File

@@ -222,8 +222,7 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
client_id=self.client_id, client_id=self.client_id,
return_queue="tornado_return", return_queue="tornado_return",
log_data=log_data, log_data=log_data,
request_environ=request_environ)), request_environ=request_environ)))
lambda x: None, call_consume_in_tests=True)
def on_close(self) -> None: def on_close(self) -> None:
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,)) log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))

View File

@@ -1136,7 +1136,7 @@ def update_message_backend(request, user_profile,
# `render_incoming_message` call earlier in this function. # `render_incoming_message` call earlier in this function.
'message_realm_id': user_profile.realm_id, 'message_realm_id': user_profile.realm_id,
'urls': links_for_embed} 'urls': links_for_embed}
queue_json_publish('embed_links', event_data, lambda x: None, call_consume_in_tests=True) queue_json_publish('embed_links', event_data)
return json_success() return json_success()

View File

@@ -140,6 +140,6 @@ def report_error(request, user_profile, message=REQ(), stacktrace=REQ(),
log = log, log = log,
more_info = more_info, more_info = more_info,
) )
), lambda x: None, call_consume_in_tests=True) ))
return json_success() return json_success()