events: Pass a realm object into send_event.

This is a preparator refactor for supporting hosting different Tornado
processes on different servers; to look up which Tornado server we
should be sending the event to, we'll need the realm object.
This commit is contained in:
Tim Abbott
2018-11-02 15:33:54 -07:00
parent 75e48459b5
commit ea1ec68899
9 changed files with 119 additions and 99 deletions

View File

@@ -51,15 +51,16 @@ problems in a scalable, correct, and predictable way.
## Generation system
Zulip's generation system is built around a Python function,
`send_event(event, users)`. It accepts an event data structure (just
a Python dictionary with some keys and value; `type` is always one of
the keys but the rest depends on the specific event) and a list of
user IDs for the users whose clients should receive the event. In
special cases such as message delivery, the list of users will instead
be a list of dicts mapping user IDs to user-specific data like whether
that user was mentioned in that message. The data passed to
`send_event` are simply marshalled as JSON and placed in the
`notify_tornado` RabbitMQ queue to be consumed by the delivery system.
`send_event(realm, event, users)`. It accepts the realm (used for
sharding), the event data structure (just a Python dictionary with
some keys and value; `type` is always one of the keys but the rest
depends on the specific event) and a list of user IDs for the users
whose clients should receive the event. In special cases such as
message delivery, the list of users will instead be a list of dicts
mapping user IDs to user-specific data like whether that user was
mentioned in that message. The data passed to `send_event` are simply
marshalled as JSON and placed in the `notify_tornado` RabbitMQ queue
to be consumed by the delivery system.
Usually, this list of users is one of 3 things:

View File

@@ -307,7 +307,7 @@ active users in a realm.
property=name,
value=value,
)
send_event(event, active_user_ids(realm))
send_event(realm, event, active_user_ids(realm))
If the new realm property being added does not fit into the
`property_types` framework (such as the `authentication_methods`
@@ -327,7 +327,7 @@ field and send an event. For example:
property='default',
data=dict(authentication_methods=realm.authentication_methods_dict())
)
send_event(event, active_user_ids(realm))
send_event(realm, event, active_user_ids(realm))
### Update application state

View File

@@ -283,7 +283,7 @@ def do_set_realm_name(realm, name):
property='name',
value=name,
)
send_event(event, active_user_ids(realm))
send_event(realm, event, active_user_ids(realm))
```
`realm.save()` actually saves the changes to the realm to the

View File

@@ -352,7 +352,7 @@ def send_signup_message(sender: UserProfile, admin_realm_signup_notifications_st
def notify_invites_changed(user_profile: UserProfile) -> None:
event = dict(type="invites_changed")
admin_ids = [user.id for user in user_profile.realm.get_admin_users()]
send_event(event, admin_ids)
send_event(user_profile.realm, event, admin_ids)
def notify_new_user(user_profile: UserProfile, internal: bool=False) -> None:
if settings.NOTIFICATION_BOT is not None:
@@ -486,7 +486,7 @@ def notify_created_user(user_profile: UserProfile) -> None:
is_bot=user_profile.is_bot)) # type: Dict[str, Any]
if not user_profile.is_bot:
event["person"]["profile_data"] = {}
send_event(event, active_user_ids(user_profile.realm_id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
def created_bot_event(user_profile: UserProfile) -> Dict[str, Any]:
def stream_name(stream: Optional[Stream]) -> Optional[str]:
@@ -520,7 +520,7 @@ def created_bot_event(user_profile: UserProfile) -> Dict[str, Any]:
def notify_created_bot(user_profile: UserProfile) -> None:
event = created_bot_event(user_profile)
send_event(event, bot_owner_user_ids(user_profile))
send_event(user_profile.realm, event, bot_owner_user_ids(user_profile))
def create_users(realm: Realm, name_list: Iterable[Tuple[str, str]], bot_type: int=None) -> None:
user_set = set()
@@ -627,7 +627,7 @@ def do_set_realm_property(realm: Realm, name: str, value: Any) -> None:
property=name,
value=value,
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_set_realm_authentication_methods(realm: Realm,
@@ -642,7 +642,7 @@ def do_set_realm_authentication_methods(realm: Realm,
property='default',
data=dict(authentication_methods=realm.authentication_methods_dict())
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_set_realm_message_editing(realm: Realm,
allow_message_editing: bool,
@@ -664,7 +664,7 @@ def do_set_realm_message_editing(realm: Realm,
message_content_edit_limit_seconds=message_content_edit_limit_seconds,
allow_community_topic_editing=allow_community_topic_editing),
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_set_realm_message_deleting(realm: Realm,
message_content_delete_limit_seconds: int) -> None:
@@ -676,7 +676,7 @@ def do_set_realm_message_deleting(realm: Realm,
property="default",
data=dict(message_content_delete_limit_seconds=message_content_delete_limit_seconds),
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_set_realm_notifications_stream(realm: Realm, stream: Stream, stream_id: int) -> None:
realm.notifications_stream = stream
@@ -687,7 +687,7 @@ def do_set_realm_notifications_stream(realm: Realm, stream: Stream, stream_id: i
property="notifications_stream_id",
value=stream_id
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_set_realm_signup_notifications_stream(realm: Realm, stream: Stream,
stream_id: int) -> None:
@@ -699,7 +699,7 @@ def do_set_realm_signup_notifications_stream(realm: Realm, stream: Stream,
property="signup_notifications_stream_id",
value=stream_id
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_deactivate_realm(realm: Realm) -> None:
"""
@@ -727,7 +727,7 @@ def do_deactivate_realm(realm: Realm) -> None:
event = dict(type="realm", op="deactivated",
realm_id=realm.id)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_reactivate_realm(realm: Realm) -> None:
realm.deactivated = False
@@ -778,14 +778,14 @@ def do_deactivate_user(user_profile: UserProfile,
person=dict(email=user_profile.email,
user_id=user_profile.id,
full_name=user_profile.full_name))
send_event(event, active_user_ids(user_profile.realm_id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
if user_profile.is_bot:
event = dict(type="realm_bot", op="remove",
bot=dict(email=user_profile.email,
user_id=user_profile.id,
full_name=user_profile.full_name))
send_event(event, bot_owner_user_ids(user_profile))
send_event(user_profile.realm, event, bot_owner_user_ids(user_profile))
if _cascade:
bot_profiles = UserProfile.objects.filter(is_bot=True, is_active=True,
@@ -835,7 +835,7 @@ def do_deactivate_stream(stream: Stream, log: bool=True) -> None:
stream_dict.update(dict(name=old_name, invite_only=was_invite_only))
event = dict(type="stream", op="delete",
streams=[stream_dict])
send_event(event, affected_user_ids)
send_event(stream.realm, event, affected_user_ids)
def do_change_user_email(user_profile: UserProfile, new_email: str) -> None:
delete_user_profile_caches([user_profile])
@@ -846,7 +846,8 @@ def do_change_user_email(user_profile: UserProfile, new_email: str) -> None:
payload = dict(user_id=user_profile.id,
new_email=new_email)
send_event(dict(type='realm_user', op='update', person=payload),
send_event(user_profile.realm,
dict(type='realm_user', op='update', person=payload),
active_user_ids(user_profile.realm_id))
event_time = timezone_now()
RealmAuditLog.objects.create(realm=user_profile.realm, acting_user=user_profile,
@@ -1398,7 +1399,7 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
event['local_id'] = message['local_id']
if message['sender_queue_id'] is not None:
event['sender_queue_id'] = message['sender_queue_id']
send_event(event, users)
send_event(message['realm'], event, users)
if url_embed_preview_enabled_for_realm(message['message']) and links_for_embed:
event_data = {
@@ -1539,7 +1540,8 @@ def bulk_insert_ums(ums: List[UserMessageLite]) -> None:
with connection.cursor() as cursor:
cursor.execute(query)
def do_add_submessage(sender_id: int,
def do_add_submessage(realm: Realm,
sender_id: int,
message_id: int,
msg_type: str,
content: str,
@@ -1563,7 +1565,7 @@ def do_add_submessage(sender_id: int,
ums = UserMessage.objects.filter(message_id=message_id)
target_user_ids = [um.user_profile_id for um in ums]
send_event(event, target_user_ids)
send_event(realm, event, target_user_ids)
def notify_reaction_update(user_profile: UserProfile, message: Message,
reaction: Reaction, op: str) -> None:
@@ -1594,7 +1596,7 @@ def notify_reaction_update(user_profile: UserProfile, message: Message,
# "historical" UserMessage row for any user who reacts to message,
# subscribing them to future notifications.
ums = UserMessage.objects.filter(message=message.id)
send_event(event, [um.user_profile_id for um in ums])
send_event(user_profile.realm, event, [um.user_profile_id for um in ums])
def do_add_reaction_legacy(user_profile: UserProfile, message: Message, emoji_name: str) -> None:
(emoji_code, reaction_type) = emoji_name_to_emoji_code(user_profile.realm, emoji_name)
@@ -1628,7 +1630,7 @@ def do_remove_reaction(user_profile: UserProfile, message: Message,
reaction.delete()
notify_reaction_update(user_profile, message, reaction, "remove")
def do_send_typing_notification(notification: Dict[str, Any]) -> None:
def do_send_typing_notification(realm: Realm, notification: Dict[str, Any]) -> None:
recipient_user_profiles = get_typing_user_profiles(notification['recipient'],
notification['sender'].id)
# Only deliver the notification to active user recipients
@@ -1643,14 +1645,14 @@ def do_send_typing_notification(notification: Dict[str, Any]) -> None:
sender = sender_dict,
recipients = recipient_dicts)
send_event(event, user_ids_to_notify)
send_event(realm, event, user_ids_to_notify)
# check_send_typing_notification:
# Checks the typing notification and sends it
def check_send_typing_notification(sender: UserProfile, notification_to: Sequence[str],
operator: str) -> None:
typing_notification = check_typing_notification(sender, notification_to, operator)
do_send_typing_notification(typing_notification)
do_send_typing_notification(sender.realm, typing_notification)
# check_typing_notification:
# Returns typing notification ready for sending with do_send_typing_notification on success
@@ -1697,7 +1699,7 @@ def prep_stream_welcome_message(stream: Stream) -> Optional[Dict[str, Any]]:
def send_stream_creation_event(stream: Stream, user_ids: List[int]) -> None:
event = dict(type="stream", op="create",
streams=[stream.to_dict()])
send_event(event, user_ids)
send_event(stream.realm, event, user_ids)
def get_default_value_for_history_public_to_subscribers(
realm: Realm,
@@ -2500,7 +2502,7 @@ def notify_subscriptions_added(user_profile: UserProfile,
for (subscription, stream) in sub_pairs]
event = dict(type="subscription", op="add",
subscriptions=payload)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def get_peer_user_ids_for_stream_change(stream: Stream,
altered_user_ids: Iterable[int],
@@ -2654,7 +2656,7 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
event = dict(type="stream", op="occupy",
streams=[stream.to_dict()
for stream in new_occupied_streams])
send_event(event, active_user_ids(user_profile.realm_id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
# Notify all existing users on streams that users have joined
@@ -2723,7 +2725,7 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
event = dict(type="subscription", op="peer_add",
subscriptions=[stream.name],
user_id=new_user_id)
send_event(event, peer_user_ids)
send_event(stream.realm, event, peer_user_ids)
return ([(user_profile, stream) for (user_profile, recipient_id, stream) in new_subs] +
[(sub.user_profile, stream) for (sub, stream) in subs_to_activate],
@@ -2740,7 +2742,7 @@ def notify_subscriptions_removed(user_profile: UserProfile, streams: Iterable[St
payload = [dict(name=stream.name, stream_id=stream.id) for stream in streams]
event = dict(type="subscription", op="remove",
subscriptions=payload)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
SubAndRemovedT = Tuple[List[Tuple[UserProfile, Stream]], List[Tuple[UserProfile, Stream]]]
def bulk_remove_subscriptions(users: Iterable[UserProfile],
@@ -2852,7 +2854,7 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
op="peer_remove",
subscriptions=[stream.name],
user_id=removed_user.id)
send_event(event, peer_user_ids)
send_event(our_realm, event, peer_user_ids)
for stream in streams:
send_peer_remove_event(stream=stream)
@@ -2867,7 +2869,7 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
event = dict(type="stream", op="vacate",
streams=[stream.to_dict()
for stream in new_vacant_public_streams])
send_event(event, active_user_ids(our_realm.id))
send_event(our_realm, event, active_user_ids(our_realm.id))
if new_vacant_private_streams:
# Deactivate any newly-vacant private streams
for stream in new_vacant_private_streams:
@@ -2902,7 +2904,7 @@ def do_change_subscription_property(user_profile: UserProfile, sub: Subscription
value=value,
stream_id=stream.id,
name=stream.name)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_change_password(user_profile: UserProfile, password: str, commit: bool=True) -> None:
user_profile.set_password(password)
@@ -2925,10 +2927,12 @@ def do_change_full_name(user_profile: UserProfile, full_name: str,
payload = dict(email=user_profile.email,
user_id=user_profile.id,
full_name=user_profile.full_name)
send_event(dict(type='realm_user', op='update', person=payload),
send_event(user_profile.realm,
dict(type='realm_user', op='update', person=payload),
active_user_ids(user_profile.realm_id))
if user_profile.is_bot:
send_event(dict(type='realm_bot', op='update', bot=payload),
send_event(user_profile.realm,
dict(type='realm_bot', op='update', bot=payload),
bot_owner_user_ids(user_profile))
def check_change_full_name(user_profile: UserProfile, full_name_raw: str,
@@ -2975,7 +2979,8 @@ def do_change_bot_owner(user_profile: UserProfile, bot_owner: UserProfile,
# Delete the bot from previous owner's bot data.
if previous_owner and not previous_owner.is_realm_admin:
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op="delete",
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -2987,11 +2992,12 @@ def do_change_bot_owner(user_profile: UserProfile, bot_owner: UserProfile,
# Notify the new owner that the bot has been added.
if not bot_owner.is_realm_admin:
add_event = created_bot_event(user_profile)
send_event(add_event, {bot_owner.id, })
send_event(user_profile.realm, add_event, {bot_owner.id, })
# Do not send update event for bot_owner.
update_users = update_users - {bot_owner.id, }
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -3017,7 +3023,8 @@ def do_regenerate_api_key(user_profile: UserProfile, acting_user: UserProfile) -
event_time=event_time)
if user_profile.is_bot:
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -3036,7 +3043,8 @@ def do_change_avatar_fields(user_profile: UserProfile, avatar_source: str) -> No
event_time=event_time)
if user_profile.is_bot:
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -3052,7 +3060,8 @@ def do_change_avatar_fields(user_profile: UserProfile, avatar_source: str) -> No
user_id=user_profile.id
)
send_event(dict(type='realm_user',
send_event(user_profile.realm,
dict(type='realm_user',
op='update',
person=payload),
active_user_ids(user_profile.realm_id))
@@ -3071,7 +3080,8 @@ def do_change_icon_source(realm: Realm, icon_source: str, log: bool=True) -> Non
'realm': realm.string_id,
'icon_source': icon_source})
send_event(dict(type='realm',
send_event(realm,
dict(type='realm',
op='update_dict',
property="icon",
data=dict(icon_source=realm.icon_source,
@@ -3111,7 +3121,8 @@ def do_change_default_sending_stream(user_profile: UserProfile, stream: Optional
stream_name = stream.name # type: Optional[str]
else:
stream_name = None
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -3133,7 +3144,8 @@ def do_change_default_events_register_stream(user_profile: UserProfile,
stream_name = stream.name # type: Optional[str]
else:
stream_name = None
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -3150,7 +3162,8 @@ def do_change_default_all_public_streams(user_profile: UserProfile, value: bool,
'user': user_profile.email,
'value': str(value)})
if user_profile.is_bot:
send_event(dict(type='realm_bot',
send_event(user_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=user_profile.email,
user_id=user_profile.id,
@@ -3174,7 +3187,7 @@ def do_change_is_admin(user_profile: UserProfile, value: bool,
person=dict(email=user_profile.email,
user_id=user_profile.id,
is_admin=value))
send_event(event, active_user_ids(user_profile.realm_id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
def do_change_is_guest(user_profile: UserProfile, value: bool) -> None:
user_profile.is_guest = value
@@ -3183,7 +3196,7 @@ def do_change_is_guest(user_profile: UserProfile, value: bool) -> None:
person=dict(email=user_profile.email,
user_id=user_profile.id,
is_guest=value))
send_event(event, active_user_ids(user_profile.realm_id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
def do_change_stream_invite_only(stream: Stream, invite_only: bool,
@@ -3255,7 +3268,7 @@ def do_rename_stream(stream: Stream, new_name: str, log: bool=True) -> Dict[str,
stream_id=stream.id,
name=old_name,
)
send_event(event, can_access_stream_user_ids(stream))
send_event(stream.realm, event, can_access_stream_user_ids(stream))
# Even though the token doesn't change, the web client needs to update the
# email forwarding address to display the correctly-escaped new name.
@@ -3273,7 +3286,7 @@ def do_change_stream_description(stream: Stream, new_description: str) -> None:
stream_id=stream.id,
value=new_description,
)
send_event(event, can_access_stream_user_ids(stream))
send_event(stream.realm, event, can_access_stream_user_ids(stream))
def do_create_realm(string_id: str, name: str,
emails_restricted_to_domains: Optional[bool]=None) -> Realm:
@@ -3338,7 +3351,7 @@ def do_change_notification_settings(user_profile: UserProfile, name: str, value:
'setting': value}
if log:
log_event(event)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_change_enter_sends(user_profile: UserProfile, enter_sends: bool) -> None:
user_profile.enter_sends = enter_sends
@@ -3359,14 +3372,15 @@ def do_set_user_display_setting(user_profile: UserProfile,
assert isinstance(setting_value, str)
event['language_name'] = get_language_name(setting_value)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
# Updates to the timezone display setting are sent to all users
if setting_name == "timezone":
payload = dict(email=user_profile.email,
user_id=user_profile.id,
timezone=user_profile.timezone)
send_event(dict(type='realm_user', op='update', person=payload),
send_event(user_profile.realm,
dict(type='realm_user', op='update', person=payload),
active_user_ids(user_profile.realm_id))
def lookup_default_stream_groups(default_stream_group_names: List[str],
@@ -3400,32 +3414,32 @@ def set_default_streams(realm: Realm, stream_dict: Dict[str, Dict[str, Any]]) ->
'realm': realm.string_id,
'streams': stream_names})
def notify_default_streams(realm_id: int) -> None:
def notify_default_streams(realm: Realm) -> None:
event = dict(
type="default_streams",
default_streams=streams_to_dicts_sorted(get_default_streams_for_realm(realm_id))
default_streams=streams_to_dicts_sorted(get_default_streams_for_realm(realm.id))
)
send_event(event, active_user_ids(realm_id))
send_event(realm, event, active_user_ids(realm.id))
def notify_default_stream_groups(realm: Realm) -> None:
event = dict(
type="default_stream_groups",
default_stream_groups=default_stream_groups_to_dicts_sorted(get_default_stream_groups(realm))
)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def do_add_default_stream(stream: Stream) -> None:
realm_id = stream.realm_id
stream_id = stream.id
if not DefaultStream.objects.filter(realm_id=realm_id, stream_id=stream_id).exists():
DefaultStream.objects.create(realm_id=realm_id, stream_id=stream_id)
notify_default_streams(realm_id)
notify_default_streams(stream.realm)
def do_remove_default_stream(stream: Stream) -> None:
realm_id = stream.realm_id
stream_id = stream.id
DefaultStream.objects.filter(realm_id=realm_id, stream_id=stream_id).delete()
notify_default_streams(realm_id)
notify_default_streams(stream.realm)
def do_create_default_stream_group(realm: Realm, group_name: str,
description: str, streams: List[Stream]) -> None:
@@ -3559,7 +3573,7 @@ def send_presence_changed(user_profile: UserProfile, presence: UserPresence) ->
event = dict(type="presence", email=user_profile.email,
server_timestamp=time.time(),
presence={presence_dict['client']: presence_dict})
send_event(event, active_user_ids(user_profile.realm_id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
def consolidate_client(client: Client) -> Client:
# The web app reports a client as 'website'
@@ -3665,7 +3679,7 @@ def do_update_pointer(user_profile: UserProfile, client: Client,
do_clear_mobile_push_notifications_for_ids(user_profile, app_message_ids)
event = dict(type='pointer', pointer=pointer)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_mark_all_as_read(user_profile: UserProfile, client: Client) -> int:
log_statsd_event('bankruptcy')
@@ -3687,7 +3701,7 @@ def do_mark_all_as_read(user_profile: UserProfile, client: Client) -> int:
messages=[], # we don't send messages, since the client reloads anyway
all=True
)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
statsd.incr("mark_all_as_read", count)
@@ -3733,7 +3747,7 @@ def do_mark_stream_messages_as_read(user_profile: UserProfile,
messages=message_ids,
all=False,
)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
do_clear_mobile_push_notifications_for_ids(user_profile, message_ids)
statsd.incr("mark_stream_as_read", count)
@@ -3794,7 +3808,7 @@ def do_update_message_flags(user_profile: UserProfile,
'flag': flag,
'messages': messages,
'all': False}
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
if flag == "read" and operation == "add":
do_clear_mobile_push_notifications_for_ids(user_profile, messages)
@@ -3929,7 +3943,7 @@ def do_update_embedded_data(user_profile: UserProfile,
'id': um.user_profile_id,
'flags': um.flags_list()
}
send_event(event, list(map(user_info, ums)))
send_event(user_profile.realm, event, list(map(user_info, ums)))
# We use transaction.atomic to support select_for_update in the attachment codepath.
@transaction.atomic
@@ -4067,7 +4081,7 @@ def do_update_message(user_profile: UserProfile, message: Message, topic_name: O
'id': um.user_profile_id,
'flags': um.flags_list()
}
send_event(event, list(map(user_info, ums)))
send_event(user_profile.realm, event, list(map(user_info, ums)))
return len(changed_messages)
@@ -4090,7 +4104,7 @@ def do_delete_message(user_profile: UserProfile, message: Message) -> None:
ums = [{'id': um.user_profile_id} for um in
UserMessage.objects.filter(message=message.id)]
move_messages_to_archive([message.id])
send_event(event, ums)
send_event(user_profile.realm, event, ums)
def do_delete_messages(user: UserProfile) -> None:
message_ids = Message.objects.filter(sender=user).values_list('id', flat=True).order_by('id')
@@ -4687,7 +4701,7 @@ def do_resend_user_invite_email(prereg_user: PreregistrationUser) -> int:
def notify_realm_emoji(realm: Realm) -> None:
event = dict(type="realm_emoji", op="update",
realm_emoji=realm.get_emoji())
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def check_add_realm_emoji(realm: Realm,
name: str,
@@ -4720,7 +4734,7 @@ def do_remove_realm_emoji(realm: Realm, name: str) -> None:
def notify_alert_words(user_profile: UserProfile, words: Iterable[str]) -> None:
event = dict(type="alert_words", alert_words=words)
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_add_alert_words(user_profile: UserProfile, alert_words: Iterable[str]) -> None:
words = add_user_alert_words(user_profile, alert_words)
@@ -4737,22 +4751,22 @@ def do_set_alert_words(user_profile: UserProfile, alert_words: List[str]) -> Non
def do_mute_topic(user_profile: UserProfile, stream: Stream, recipient: Recipient, topic: str) -> None:
add_topic_mute(user_profile, stream.id, recipient.id, topic)
event = dict(type="muted_topics", muted_topics=get_topic_mutes(user_profile))
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_unmute_topic(user_profile: UserProfile, stream: Stream, topic: str) -> None:
remove_topic_mute(user_profile, stream.id, topic)
event = dict(type="muted_topics", muted_topics=get_topic_mutes(user_profile))
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_mark_hotspot_as_read(user: UserProfile, hotspot: str) -> None:
UserHotspot.objects.get_or_create(user=user, hotspot=hotspot)
event = dict(type="hotspots", hotspots=get_next_hotspots(user))
send_event(event, [user.id])
send_event(user.realm, event, [user.id])
def notify_realm_filters(realm: Realm) -> None:
realm_filters = realm_filters_for_realm(realm.id)
event = dict(type="realm_filters", realm_filters=realm_filters)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
# NOTE: Regexes must be simple enough that they can be easily translated to JavaScript
# RegExp syntax. In addition to JS-compatible syntax, the following features are available:
@@ -4788,7 +4802,7 @@ def do_add_realm_domain(realm: Realm, domain: str, allow_subdomains: bool) -> (R
event = dict(type="realm_domains", op="add",
realm_domain=dict(domain=realm_domain.domain,
allow_subdomains=realm_domain.allow_subdomains))
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
return realm_domain
def do_change_realm_domain(realm_domain: RealmDomain, allow_subdomains: bool) -> None:
@@ -4797,7 +4811,7 @@ def do_change_realm_domain(realm_domain: RealmDomain, allow_subdomains: bool) ->
event = dict(type="realm_domains", op="change",
realm_domain=dict(domain=realm_domain.domain,
allow_subdomains=realm_domain.allow_subdomains))
send_event(event, active_user_ids(realm_domain.realm_id))
send_event(realm_domain.realm, event, active_user_ids(realm_domain.realm_id))
def do_remove_realm_domain(realm_domain: RealmDomain) -> None:
realm = realm_domain.realm
@@ -4810,7 +4824,7 @@ def do_remove_realm_domain(realm_domain: RealmDomain) -> None:
# confusing than the alternative.
do_set_realm_property(realm, 'emails_restricted_to_domains', False)
event = dict(type="realm_domains", op="remove", domain=domain)
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def get_occupied_streams(realm: Realm) -> QuerySet:
# TODO: Make a generic stub for QuerySet
@@ -4876,7 +4890,7 @@ def notify_attachment_update(user_profile: UserProfile, op: str,
'op': op,
'attachment': attachment_dict,
}
send_event(event, [user_profile.id])
send_event(user_profile.realm, event, [user_profile.id])
def do_claim_attachments(message: Message) -> None:
attachment_url_list = attachment_url_re.findall(message.content)
@@ -4935,7 +4949,7 @@ def notify_realm_custom_profile_fields(realm: Realm, operation: str) -> None:
event = dict(type="custom_profile_fields",
op=operation,
fields=[f.as_dict() for f in fields])
send_event(event, active_user_ids(realm.id))
send_event(realm, event, active_user_ids(realm.id))
def try_add_realm_custom_profile_field(realm: Realm, name: str, field_type: int,
hint: str='',
@@ -4992,7 +5006,7 @@ def notify_user_update_custom_profile_data(user_profile: UserProfile,
payload = dict(user_id=user_profile.id, custom_profile_field=dict(id=field['id'],
value=field_value))
event = dict(type="realm_user", op="update", person=payload)
send_event(event, active_user_ids(user_profile.realm.id))
send_event(user_profile.realm, event, active_user_ids(user_profile.realm.id))
def do_update_user_custom_profile_data(user_profile: UserProfile,
data: List[Dict[str, Union[int, str, List[int]]]]) -> None:
@@ -5016,7 +5030,7 @@ def do_send_create_user_group_event(user_group: UserGroup, members: List[UserPro
id=user_group.id,
),
)
send_event(event, active_user_ids(user_group.realm_id))
send_event(user_group.realm, event, active_user_ids(user_group.realm_id))
def check_add_user_group(realm: Realm, name: str, initial_members: List[UserProfile],
description: str) -> None:
@@ -5028,7 +5042,7 @@ def check_add_user_group(realm: Realm, name: str, initial_members: List[UserProf
def do_send_user_group_update_event(user_group: UserGroup, data: Dict[str, Any]) -> None:
event = dict(type="user_group", op='update', group_id=user_group.id, data=data)
send_event(event, active_user_ids(user_group.realm_id))
send_event(user_group.realm, event, active_user_ids(user_group.realm_id))
def do_update_user_group_name(user_group: UserGroup, name: str) -> None:
try:
@@ -5052,7 +5066,8 @@ def do_update_outgoing_webhook_service(bot_profile: UserProfile,
service.base_url = service_payload_url
service.interface = service_interface
service.save()
send_event(dict(type='realm_bot',
send_event(bot_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=bot_profile.email,
user_id=bot_profile.id,
@@ -5068,7 +5083,8 @@ def do_update_bot_config_data(bot_profile: UserProfile,
for key, value in config_data.items():
set_bot_config(bot_profile, key, value)
updated_config_data = get_bot_config(bot_profile)
send_event(dict(type='realm_bot',
send_event(bot_profile.realm,
dict(type='realm_bot',
op='update',
bot=dict(email=bot_profile.email,
user_id=bot_profile.id,
@@ -5159,7 +5175,7 @@ def do_send_user_group_members_update_event(event_name: str,
op=event_name,
group_id=user_group.id,
user_ids=user_ids)
send_event(event, active_user_ids(user_group.realm_id))
send_event(user_group.realm, event, active_user_ids(user_group.realm_id))
def bulk_add_members_to_user_group(user_group: UserGroup,
user_profiles: List[UserProfile]) -> None:
@@ -5180,16 +5196,17 @@ def remove_members_from_user_group(user_group: UserGroup,
user_ids = [up.id for up in user_profiles]
do_send_user_group_members_update_event('remove_members', user_group, user_ids)
def do_send_delete_user_group_event(user_group_id: int, realm_id: int) -> None:
def do_send_delete_user_group_event(realm: Realm, user_group_id: int,
realm_id: int) -> None:
event = dict(type="user_group",
op="remove",
group_id=user_group_id)
send_event(event, active_user_ids(realm_id))
send_event(realm, event, active_user_ids(realm_id))
def check_delete_user_group(user_group_id: int, user_profile: UserProfile) -> None:
user_group = access_user_group_by_id(user_group_id, user_profile)
user_group.delete()
do_send_delete_user_group_event(user_group_id, user_profile.realm.id)
do_send_delete_user_group_event(user_profile.realm, user_group_id, user_profile.realm.id)
def missing_any_realm_internal_bots() -> bool:
bot_emails = [bot['email_template'] % (settings.INTERNAL_BOT_DOMAIN,)

View File

@@ -890,6 +890,7 @@ class EventsRegisterTest(ZulipTestCase):
)
events = self.do_test(
lambda: do_add_submessage(
realm=cordelia.realm,
sender_id=cordelia.id,
message_id=message_id,
msg_type='whatever',

View File

@@ -853,7 +853,7 @@ class StreamMessagesTest(ZulipTestCase):
content=content
)
self.assertEqual(m.call_count, 1)
users = m.call_args[0][1]
users = m.call_args[0][2]
user_ids = {u['id'] for u in users}
return user_ids

View File

@@ -137,9 +137,9 @@ class TestBasics(ZulipTestCase):
)
self.assertEqual(m.call_count, 1)
data = m.call_args[0][0]
data = m.call_args[0][1]
self.assertEqual(data, expected_data)
users = m.call_args[0][1]
users = m.call_args[0][2]
self.assertIn(cordelia.id, users)
self.assertIn(hamlet.id, users)

View File

@@ -18,7 +18,7 @@ import signal
import tornado.autoreload
import tornado.ioloop
import random
from zerver.models import UserProfile, Client
from zerver.models import UserProfile, Client, Realm
from zerver.decorator import cachify
from zerver.tornado.handlers import clear_handler_by_id, get_handler_by_id, \
finish_handler, handler_stats_string
@@ -997,7 +997,7 @@ def send_notification_http(data: Mapping[str, Any]) -> None:
else:
process_notification(data)
def send_event(event: Mapping[str, Any],
def send_event(realm: Realm, event: Mapping[str, Any],
users: Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None:
"""`users` is a list of user IDs, or in the case of `message` type
events, a list of dicts describing the users and metadata about

View File

@@ -37,6 +37,7 @@ def process_submessage(request: HttpRequest,
return json_error(_("Invalid json for submessage"))
do_add_submessage(
realm=user_profile.realm,
sender_id=user_profile.id,
message_id=message.id,
msg_type=msg_type,