Files
zulip/zerver/lib/event_schema.py
Sahil Batra 0a3800332f streams: Remove "email_address" field from Subscription objects.
This commit removes "email_address" field from Subscription objects
and we would instead a new endpoint in next commit to get email
address for stream with proper access check.

This change also fixes the bug where we would include email address
for the unsubscribed private stream as well when user did not have
permission to send message to the stream, and having email allowed
the unsubscribed user to send message to the stream.

Note that the unsubscribed user can still send message to the stream
if the user had noted down the email before being unsubscribed
and the stream token is not changed after unsubscribing the user.
2023-11-16 11:11:26 -05:00

1803 lines
47 KiB
Python

# This module is a collection of testing helpers for validating the
# schema of "events" sent by Zulip's server-to-client push system.
#
# By policy, every event generated by Zulip's API should be validated
# by a test in test_events.py with a schema checker here (which is
# validated, in turn, against the OpenAPI documentation for GET
# /events in zulip.yaml and the fixtures used by the Zulip web app
# frontend).
#
# See https://zulip.readthedocs.io/en/latest/subsystems/events-system.html
#
# The general paradigm here is that if you have an event with type foo_bar
# then you declare foo_bar_event to be an instance of event_dict_type. And
# then you make a checker function by saying:
#
# check_foo_bar = make_checker(foo_bar_event)
#
# And then the caller can use the checker as follows:
#
# check_foo_bar(var_name, event)
#
# For more complicated events, you may write custom checkers that check
# aspects of the data that go beyond simply validating that the data
# matches an event_dict_type based schema. This typically happens with
# events where you either have a Union type or optional_keys.
#
# See check_delete_message and check_presence for examples of this
# paradigm.
from typing import Dict, List, Sequence, Set, Tuple, Union
from zerver.lib.data_types import (
DictType,
EnumType,
Equals,
ListType,
NumberType,
OptionalType,
StringDictType,
TupleType,
UnionType,
UrlType,
check_data,
event_dict_type,
make_checker,
)
from zerver.lib.topic import ORIG_TOPIC, TOPIC_LINKS, TOPIC_NAME
from zerver.models import Realm, RealmUserDefault, Stream, UserProfile
# These fields are used for "stream" events, and are included in the
# larger "subscription" events that also contain personal settings.
basic_stream_fields = [
("can_remove_subscribers_group_id", int),
("date_created", int),
("description", str),
("first_message_id", OptionalType(int)),
("history_public_to_subscribers", bool),
("invite_only", bool),
("is_announcement_only", bool),
("is_web_public", bool),
("message_retention_days", OptionalType(int)),
("name", str),
("rendered_description", str),
("stream_id", int),
("stream_post_policy", int),
]
subscription_fields: Sequence[Tuple[str, object]] = [
*basic_stream_fields,
("audible_notifications", OptionalType(bool)),
("color", str),
("desktop_notifications", OptionalType(bool)),
("email_notifications", OptionalType(bool)),
("in_home_view", bool),
("is_muted", bool),
("pin_to_top", bool),
("push_notifications", OptionalType(bool)),
("stream_weekly_traffic", OptionalType(int)),
# We may try to remove subscribers from some events in
# the future for clients that don't want subscriber
# info.
("subscribers", ListType(int)),
("wildcard_mentions_notify", OptionalType(bool)),
]
value_type = UnionType(
[
# force vertical formatting
bool,
int,
str,
]
)
optional_value_type = UnionType(
[
# force vertical formatting
bool,
int,
str,
Equals(None),
]
)
alert_words_event = event_dict_type(
required_keys=[
# force vertical formatting
("type", Equals("alert_words")),
("alert_words", ListType(str)),
]
)
check_alert_words = make_checker(alert_words_event)
attachment_message_type = DictType(
required_keys=[
# force vertical
("id", int),
("date_sent", int),
]
)
attachment_type = DictType(
required_keys=[
("id", int),
("name", str),
("size", int),
("path_id", str),
("create_time", int),
("messages", ListType(attachment_message_type)),
]
)
attachment_add_event = event_dict_type(
required_keys=[
("type", Equals("attachment")),
("op", Equals("add")),
("attachment", attachment_type),
("upload_space_used", int),
]
)
check_attachment_add = make_checker(attachment_add_event)
attachment_remove_event = event_dict_type(
required_keys=[
("type", Equals("attachment")),
("op", Equals("remove")),
("attachment", DictType([("id", int)])),
("upload_space_used", int),
]
)
check_attachment_remove = make_checker(attachment_remove_event)
attachment_update_event = event_dict_type(
required_keys=[
("type", Equals("attachment")),
("op", Equals("update")),
("attachment", attachment_type),
("upload_space_used", int),
]
)
check_attachment_update = make_checker(attachment_update_event)
custom_profile_field_type = DictType(
required_keys=[
("id", int),
("type", int),
("name", str),
("hint", str),
("field_data", str),
("order", int),
],
optional_keys=[
("display_in_profile_summary", bool),
],
)
custom_profile_fields_event = event_dict_type(
required_keys=[
("type", Equals("custom_profile_fields")),
("fields", ListType(custom_profile_field_type)),
]
)
check_custom_profile_fields = make_checker(custom_profile_fields_event)
_check_stream_group = DictType(
required_keys=[
("name", str),
("id", int),
("description", str),
("streams", ListType(DictType(basic_stream_fields))),
]
)
default_stream_groups_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("default_stream_groups")),
("default_stream_groups", ListType(_check_stream_group)),
]
)
check_default_stream_groups = make_checker(default_stream_groups_event)
default_streams_event = event_dict_type(
required_keys=[
("type", Equals("default_streams")),
("default_streams", ListType(DictType(basic_stream_fields))),
]
)
check_default_streams = make_checker(default_streams_event)
# The event type has an unusual number of optional fields. The
# message_id/message_ids fields are conditional on the
# bulk_message_deletion client_capability, whereas the other fields
# are conditional on private vs. stream messages.
delete_message_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("delete_message")),
("message_type", EnumType(["private", "stream"])),
],
optional_keys=[
("message_id", int),
("message_ids", ListType(int)),
("stream_id", int),
("topic", str),
],
)
_check_delete_message = make_checker(delete_message_event)
def check_delete_message(
var_name: str,
event: Dict[str, object],
message_type: str,
num_message_ids: int,
is_legacy: bool,
) -> None:
_check_delete_message(var_name, event)
keys = {"id", "type", "message_type"}
assert event["message_type"] == message_type
if message_type == "stream":
keys |= {"stream_id", "topic"}
elif message_type == "private":
pass
else:
raise AssertionError("unexpected message_type")
if is_legacy:
assert num_message_ids == 1
keys.add("message_id")
else:
assert isinstance(event["message_ids"], list)
assert num_message_ids == len(event["message_ids"])
keys.add("message_ids")
assert set(event.keys()) == keys
has_zoom_token_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("has_zoom_token")),
("value", bool),
]
)
_check_has_zoom_token = make_checker(has_zoom_token_event)
def check_has_zoom_token(
# force vertical
var_name: str,
event: Dict[str, object],
value: bool,
) -> None:
_check_has_zoom_token(var_name, event)
assert event["value"] == value
heartbeat_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("heartbeat")),
]
)
_check_heartbeat = make_checker(heartbeat_event)
def check_heartbeat(
# force vertical
var_name: str,
event: Dict[str, object],
) -> None:
_check_heartbeat(var_name, event)
_hotspot = DictType(
required_keys=[
# force vertical
("name", str),
("title", str),
("description", str),
("delay", NumberType()),
]
)
hotspots_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("hotspots")),
(
"hotspots",
ListType(_hotspot),
),
]
)
check_hotspots = make_checker(hotspots_event)
invites_changed_event = event_dict_type(
required_keys=[
# the most boring event...no metadata
("type", Equals("invites_changed")),
]
)
check_invites_changed = make_checker(invites_changed_event)
# This type, like other instances of TupleType, is a legacy feature of
# a very old Zulip API; we plan to replace it with an object as those
# are more extensible.
muted_topic_type = TupleType(
[
str, # stream name
str, # topic name
int, # timestamp
]
)
muted_topics_event = event_dict_type(
required_keys=[
("type", Equals("muted_topics")),
("muted_topics", ListType(muted_topic_type)),
]
)
check_muted_topics = make_checker(muted_topics_event)
user_topic_event = DictType(
required_keys=[
("id", int),
("type", Equals("user_topic")),
("stream_id", int),
("topic_name", str),
("last_updated", int),
("visibility_policy", int),
]
)
check_user_topic = make_checker(user_topic_event)
muted_user_type = DictType(
required_keys=[
("id", int),
("timestamp", int),
]
)
muted_users_event = event_dict_type(
required_keys=[
("type", Equals("muted_users")),
("muted_users", ListType(muted_user_type)),
]
)
check_muted_users = make_checker(muted_users_event)
_check_topic_links = DictType(
required_keys=[
("text", str),
("url", str),
]
)
message_fields = [
("avatar_url", OptionalType(str)),
("client", str),
("content", str),
("content_type", Equals("text/html")),
("display_recipient", str),
("id", int),
("is_me_message", bool),
("reactions", ListType(dict)),
("recipient_id", int),
("sender_realm_str", str),
("sender_email", str),
("sender_full_name", str),
("sender_id", int),
("stream_id", int),
(TOPIC_NAME, str),
(TOPIC_LINKS, ListType(_check_topic_links)),
("submessages", ListType(dict)),
("timestamp", int),
("type", str),
]
message_event = event_dict_type(
required_keys=[
("type", Equals("message")),
("flags", ListType(str)),
("message", DictType(message_fields)),
]
)
check_message = make_checker(message_event)
# This legacy presence structure is intended to be replaced by a more
# sensible data structure.
presence_type = DictType(
required_keys=[
("status", EnumType(["active", "idle"])),
("timestamp", int),
("client", str),
("pushable", bool),
]
)
presence_event = event_dict_type(
required_keys=[
("type", Equals("presence")),
("user_id", int),
("server_timestamp", NumberType()),
("presence", StringDictType(presence_type)),
],
optional_keys=[
# force vertical
("email", str),
],
)
_check_presence = make_checker(presence_event)
def check_presence(
var_name: str,
event: Dict[str, object],
has_email: bool,
presence_key: str,
status: str,
) -> None:
_check_presence(var_name, event)
assert ("email" in event) == has_email
assert isinstance(event["presence"], dict)
# Our tests only have one presence value.
[(event_presence_key, event_presence_value)] = event["presence"].items()
assert event_presence_key == presence_key
assert event_presence_value["status"] == status
# Type for the legacy user field; the `user_id` field is intended to
# replace this and we expect to remove this once clients have migrated
# to support the modern API.
reaction_legacy_user_type = DictType(
required_keys=[
# force vertical
("email", str),
("full_name", str),
("user_id", int),
]
# We should probably declare is_mirror_dummy as an optional field here.
)
reaction_add_event = event_dict_type(
required_keys=[
("type", Equals("reaction")),
("op", Equals("add")),
("message_id", int),
("emoji_name", str),
("emoji_code", str),
("reaction_type", str),
("user_id", int),
("user", reaction_legacy_user_type),
]
)
check_reaction_add = make_checker(reaction_add_event)
reaction_remove_event = event_dict_type(
required_keys=[
("type", Equals("reaction")),
("op", Equals("remove")),
("message_id", int),
("emoji_name", str),
("emoji_code", str),
("reaction_type", str),
("user_id", int),
("user", reaction_legacy_user_type),
]
)
check_reaction_remove = make_checker(reaction_remove_event)
realm_deactivated_event = event_dict_type(
required_keys=[
("type", Equals("realm")),
("op", Equals("deactivated")),
("realm_id", int),
]
)
check_realm_deactivated = make_checker(realm_deactivated_event)
bot_services_outgoing_type = DictType(
required_keys=[
# force vertical
("base_url", UrlType()),
("interface", int),
("token", str),
]
)
config_data_schema = StringDictType(str)
bot_services_embedded_type = DictType(
required_keys=[
# force vertical
("service_name", str),
("config_data", config_data_schema),
]
)
# Note that regular bots just get an empty list of services,
# so the sub_validator for ListType won't matter for them.
bot_services_type = ListType(
UnionType(
[
# force vertical
bot_services_outgoing_type,
bot_services_embedded_type,
]
),
)
bot_type = DictType(
required_keys=[
("user_id", int),
("api_key", str),
("avatar_url", str),
("bot_type", int),
("default_all_public_streams", bool),
("default_events_register_stream", OptionalType(str)),
("default_sending_stream", OptionalType(str)),
("email", str),
("full_name", str),
("is_active", bool),
("owner_id", int),
("services", bot_services_type),
]
)
realm_bot_add_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("realm_bot")),
("op", Equals("add")),
("bot", bot_type),
]
)
_check_realm_bot_add = make_checker(realm_bot_add_event)
def check_realm_bot_add(
var_name: str,
event: Dict[str, object],
) -> None:
_check_realm_bot_add(var_name, event)
assert isinstance(event["bot"], dict)
bot_type = event["bot"]["bot_type"]
services_field = f"{var_name}['bot']['services']"
services = event["bot"]["services"]
if bot_type == UserProfile.DEFAULT_BOT:
check_data(Equals([]), services_field, services)
elif bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
check_data(ListType(bot_services_outgoing_type, length=1), services_field, services)
elif bot_type == UserProfile.EMBEDDED_BOT:
check_data(ListType(bot_services_embedded_type, length=1), services_field, services)
else:
raise AssertionError(f"Unknown bot_type: {bot_type}")
bot_type_for_delete = DictType(
required_keys=[
# for legacy reasons we have a dict here
# with only one key
("user_id", int),
]
)
realm_bot_delete_event = event_dict_type(
required_keys=[
("type", Equals("realm_bot")),
("op", Equals("delete")),
("bot", bot_type_for_delete),
]
)
check_realm_bot_delete = make_checker(realm_bot_delete_event)
bot_type_for_remove = DictType(
required_keys=[
# Why does remove have full_name but delete doesn't?
# Why do we have both a remove and a delete event
# for bots? I don't know the answer as I write this.
("full_name", str),
("user_id", int),
]
)
realm_bot_remove_event = event_dict_type(
required_keys=[
("type", Equals("realm_bot")),
("op", Equals("remove")),
("bot", bot_type_for_remove),
]
)
check_realm_bot_remove = make_checker(realm_bot_remove_event)
bot_type_for_update = DictType(
required_keys=[
# force vertical
("user_id", int),
],
optional_keys=[
("api_key", str),
("avatar_url", str),
("default_all_public_streams", bool),
("default_events_register_stream", OptionalType(str)),
("default_sending_stream", OptionalType(str)),
("full_name", str),
("owner_id", int),
("services", bot_services_type),
],
)
realm_bot_update_event = event_dict_type(
required_keys=[
("type", Equals("realm_bot")),
("op", Equals("update")),
("bot", bot_type_for_update),
]
)
_check_realm_bot_update = make_checker(realm_bot_update_event)
def check_realm_bot_update(
# Check schema plus the field.
var_name: str,
event: Dict[str, object],
field: str,
) -> None:
# Check the overall schema first.
_check_realm_bot_update(var_name, event)
assert isinstance(event["bot"], dict)
assert {"user_id", field} == set(event["bot"].keys())
realm_domain_type = DictType(
required_keys=[
# force vertical
("domain", str),
("allow_subdomains", bool),
]
)
realm_domains_add_event = event_dict_type(
required_keys=[
("type", Equals("realm_domains")),
("op", Equals("add")),
("realm_domain", realm_domain_type),
]
)
check_realm_domains_add = make_checker(realm_domains_add_event)
realm_domains_change_event = event_dict_type(
required_keys=[
("type", Equals("realm_domains")),
("op", Equals("change")),
("realm_domain", realm_domain_type),
]
)
check_realm_domains_change = make_checker(realm_domains_change_event)
realm_domains_remove_event = event_dict_type(
required_keys=[
("type", Equals("realm_domains")),
("op", Equals("remove")),
("domain", str),
]
)
check_realm_domains_remove = make_checker(realm_domains_remove_event)
realm_playground_type = DictType(
required_keys=[("id", int), ("name", str), ("pygments_language", str), ("url_prefix", str)]
)
realm_playgrounds_event = event_dict_type(
required_keys=[
("type", Equals("realm_playgrounds")),
("realm_playgrounds", ListType(realm_playground_type)),
]
)
_check_realm_playgrounds = make_checker(realm_playgrounds_event)
def check_realm_playgrounds(var_name: str, event: Dict[str, object]) -> None:
_check_realm_playgrounds(var_name, event)
assert isinstance(event["realm_playgrounds"], list)
realm_emoji_type = DictType(
required_keys=[
("id", str),
("name", str),
("source_url", str),
("deactivated", bool),
("author_id", int),
("still_url", OptionalType(str)),
],
)
realm_emoji_update_event = event_dict_type(
required_keys=[
("type", Equals("realm_emoji")),
("op", Equals("update")),
("realm_emoji", StringDictType(realm_emoji_type)),
]
)
_check_realm_emoji_update = make_checker(realm_emoji_update_event)
def check_realm_emoji_update(var_name: str, event: Dict[str, object]) -> None:
"""
The way we send realm emojis is kinda clumsy--we
send a dict mapping the emoji id to a sub_dict with
the fields (including the id). Ideally we can streamline
this and just send a list of dicts. The clients can make
a Map as needed.
"""
_check_realm_emoji_update(var_name, event)
assert isinstance(event["realm_emoji"], dict)
for k, v in event["realm_emoji"].items():
assert v["id"] == k
export_type = DictType(
required_keys=[
("id", int),
("export_time", NumberType()),
("acting_user_id", int),
("export_url", OptionalType(str)),
("deleted_timestamp", OptionalType(NumberType())),
("failed_timestamp", OptionalType(NumberType())),
("pending", bool),
]
)
realm_export_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("realm_export")),
(
"exports",
ListType(export_type),
),
]
)
_check_realm_export = make_checker(realm_export_event)
def check_realm_export(
var_name: str,
event: Dict[str, object],
has_export_url: bool,
has_deleted_timestamp: bool,
has_failed_timestamp: bool,
) -> None:
# Check the overall event first, knowing it has some
# optional types.
_check_realm_export(var_name, event)
# It's possible to have multiple data exports, but the events tests do not
# exercise that case, so we do strict validation for a single export here.
assert isinstance(event["exports"], list)
assert len(event["exports"]) == 1
export = event["exports"][0]
# Now verify which fields have non-None values.
assert has_export_url == (export["export_url"] is not None)
assert has_deleted_timestamp == (export["deleted_timestamp"] is not None)
assert has_failed_timestamp == (export["failed_timestamp"] is not None)
realm_linkifier_type = DictType(
required_keys=[
("pattern", str),
("url_template", str),
("id", int),
]
)
realm_linkifiers_event = event_dict_type(
[
("type", Equals("realm_linkifiers")),
("realm_linkifiers", ListType(realm_linkifier_type)),
]
)
check_realm_linkifiers = make_checker(realm_linkifiers_event)
plan_type_extra_data_type = DictType(
required_keys=[
# force vertical
("upload_quota", int),
]
)
"""
realm/update events are flexible for values;
we will use a more strict checker to check
types in a context-specific manner
"""
realm_update_event = event_dict_type(
required_keys=[
("type", Equals("realm")),
("op", Equals("update")),
("property", str),
("value", value_type),
],
optional_keys=[
# force vertical
("extra_data", plan_type_extra_data_type),
],
)
_check_realm_update = make_checker(realm_update_event)
def check_realm_update(
var_name: str,
event: Dict[str, object],
prop: str,
) -> None:
"""
Realm updates have these two fields:
property
value
We check not only the basic schema, but also that
the value people actually matches the type from
Realm.property_types that we have configured
for the property.
"""
_check_realm_update(var_name, event)
assert prop == event["property"]
value = event["value"]
if prop == "plan_type":
assert isinstance(value, int)
assert "extra_data" in event
return
assert "extra_data" not in event
if prop in ["notifications_stream_id", "signup_notifications_stream_id", "org_type"]:
assert isinstance(value, int)
return
property_type = Realm.property_types[prop]
if property_type in (bool, int, str):
assert isinstance(value, property_type)
elif property_type == (int, type(None)):
assert isinstance(value, int)
elif property_type == (str, type(None)):
assert isinstance(value, str)
else:
raise AssertionError(f"Unexpected property type {property_type}")
realm_user_settings_defaults_update_event = event_dict_type(
required_keys=[
("type", Equals("realm_user_settings_defaults")),
("op", Equals("update")),
("property", str),
("value", value_type),
],
)
_check_realm_default_update = make_checker(realm_user_settings_defaults_update_event)
def check_realm_default_update(
var_name: str,
event: Dict[str, object],
prop: str,
) -> None:
_check_realm_default_update(var_name, event)
assert prop == event["property"]
assert prop != "default_language"
assert prop in RealmUserDefault.property_types
prop_type = RealmUserDefault.property_types[prop]
assert isinstance(event["value"], prop_type)
authentication_dict = DictType(
required_keys=[
("Google", bool),
("Dev", bool),
("LDAP", bool),
("GitHub", bool),
("Email", bool),
]
)
authentication_data = DictType(
required_keys=[
# this single-key dictionary is an annoying consequence of us
# using property_type of "default" for legacy reasons
# (specifically, to avoid breaking old clients when we
# introduced the `update_dict` format).
("authentication_methods", authentication_dict),
]
)
icon_data = DictType(
required_keys=[
# force vertical
("icon_url", str),
("icon_source", str),
]
)
logo_data = DictType(
required_keys=[
# force vertical
("logo_url", str),
("logo_source", str),
]
)
allow_message_editing_data = DictType(
required_keys=[
("allow_message_editing", bool),
]
)
message_content_edit_limit_seconds_data = DictType(
required_keys=[
("message_content_edit_limit_seconds", OptionalType(int)),
]
)
edit_topic_policy_data = DictType(
required_keys=[
("edit_topic_policy", int),
]
)
night_logo_data = DictType(
required_keys=[
# force vertical
("night_logo_url", str),
("night_logo_source", str),
]
)
update_dict_data = UnionType(
[
# force vertical
allow_message_editing_data,
authentication_data,
edit_topic_policy_data,
icon_data,
logo_data,
message_content_edit_limit_seconds_data,
night_logo_data,
]
)
realm_update_dict_event = event_dict_type(
required_keys=[
("type", Equals("realm")),
("op", Equals("update_dict")),
("property", EnumType(["default", "icon", "logo", "night_logo"])),
("data", update_dict_data),
]
)
_check_realm_update_dict = make_checker(realm_update_dict_event)
def check_realm_update_dict(
# handle union types
var_name: str,
event: Dict[str, object],
) -> None:
_check_realm_update_dict(var_name, event)
if event["property"] == "default":
assert isinstance(event["data"], dict)
if "allow_message_editing" in event["data"]:
sub_type = allow_message_editing_data
elif "message_content_edit_limit_seconds" in event["data"]:
sub_type = message_content_edit_limit_seconds_data
elif "edit_topic_policy" in event["data"]:
sub_type = edit_topic_policy_data
elif "authentication_methods" in event["data"]:
sub_type = authentication_data
else:
raise AssertionError("unhandled fields in data")
elif event["property"] == "icon":
sub_type = icon_data
elif event["property"] == "logo":
sub_type = logo_data
elif event["property"] == "night_logo":
sub_type = night_logo_data
else:
raise AssertionError("unhandled property: {event['property']}")
check_data(sub_type, f"{var_name}['data']", event["data"])
# TODO: This type is missing optional fields:
#
# * delivery_email
# * bot-related fields.
# * Users with custom profile fields, where profile_data will
# be nonempty.
#
# Only because our test_events.py tests don't cover the relevant cases.
realm_user_type = DictType(
required_keys=[
("user_id", int),
("email", str),
("avatar_url", OptionalType(str)),
("avatar_version", int),
("full_name", str),
("is_admin", bool),
("is_billing_admin", bool),
("is_owner", bool),
("is_bot", bool),
("is_guest", bool),
("role", EnumType(UserProfile.ROLE_TYPES)),
("is_active", bool),
("profile_data", StringDictType(dict)),
("timezone", str),
("date_joined", str),
("delivery_email", OptionalType(str)),
]
)
realm_user_add_event = event_dict_type(
required_keys=[
("type", Equals("realm_user")),
("op", Equals("add")),
("person", realm_user_type),
]
)
check_realm_user_add = make_checker(realm_user_add_event)
removed_user_type = DictType(
required_keys=[
# force vertical
("user_id", int),
("full_name", str),
]
)
realm_user_remove_event = event_dict_type(
required_keys=[
("type", Equals("realm_user")),
("op", Equals("remove")),
("person", removed_user_type),
],
)
check_realm_user_remove = make_checker(realm_user_remove_event)
custom_profile_field_type = DictType(
required_keys=[
# vertical formatting
("id", int),
("value", str),
],
optional_keys=[
# vertical formatting
("rendered_value", str),
],
)
# This block of types, each named by the dictionary key, makes it
# possible to validate the type of all the realm_user update events.
realm_user_person_types = dict(
# Note that all flavors of person include user_id.
avatar_fields=DictType(
required_keys=[
("user_id", int),
("avatar_source", str),
("avatar_url", OptionalType(str)),
("avatar_url_medium", OptionalType(str)),
("avatar_version", int),
],
),
bot_owner_id=DictType(
required_keys=[
# vertical formatting
("user_id", int),
("bot_owner_id", int),
],
),
custom_profile_field=DictType(
required_keys=[
# vertical formatting
("user_id", int),
("custom_profile_field", custom_profile_field_type),
],
),
delivery_email=DictType(
required_keys=[
# vertical formatting
("user_id", int),
("delivery_email", OptionalType(str)),
],
),
email=DictType(
required_keys=[
("user_id", int),
("new_email", str),
],
),
full_name=DictType(
required_keys=[
# vertical formatting
("user_id", int),
("full_name", str),
],
),
is_billing_admin=DictType(
required_keys=[
# vertical formatting
("user_id", int),
("is_billing_admin", bool),
],
),
role=DictType(
required_keys=[
# vertical formatting
("user_id", int),
("role", EnumType(UserProfile.ROLE_TYPES)),
],
),
timezone=DictType(
required_keys=[
# we should probably eliminate email here
("user_id", int),
("email", str),
("timezone", str),
],
),
)
realm_user_update_event = event_dict_type(
required_keys=[
("type", Equals("realm_user")),
("op", Equals("update")),
("person", UnionType(list(realm_user_person_types.values()))),
],
)
_check_realm_user_update = make_checker(realm_user_update_event)
def check_realm_user_update(
# person_flavor tells us which extra fields we need
var_name: str,
event: Dict[str, object],
person_flavor: str,
) -> None:
_check_realm_user_update(var_name, event)
check_data(
realm_user_person_types[person_flavor],
f"{var_name}['person']",
event["person"],
)
restart_event = event_dict_type(
required_keys=[
("type", Equals("restart")),
("zulip_version", str),
("zulip_merge_base", str),
("zulip_feature_level", int),
("server_generation", int),
("immediate", bool),
]
)
check_restart_event = make_checker(restart_event)
stream_create_event = event_dict_type(
required_keys=[
("type", Equals("stream")),
("op", Equals("create")),
("streams", ListType(DictType(basic_stream_fields))),
]
)
check_stream_create = make_checker(stream_create_event)
stream_delete_event = event_dict_type(
required_keys=[
("type", Equals("stream")),
("op", Equals("delete")),
("streams", ListType(DictType(basic_stream_fields))),
]
)
check_stream_delete = make_checker(stream_delete_event)
stream_update_event = event_dict_type(
required_keys=[
("type", Equals("stream")),
("op", Equals("update")),
("property", str),
("value", optional_value_type),
("name", str),
("stream_id", int),
],
optional_keys=[
("rendered_description", str),
("history_public_to_subscribers", bool),
("is_web_public", bool),
],
)
_check_stream_update = make_checker(stream_update_event)
def check_stream_update(
var_name: str,
event: Dict[str, object],
) -> None:
_check_stream_update(var_name, event)
prop = event["property"]
value = event["value"]
extra_keys = set(event.keys()) - {
"id",
"type",
"op",
"property",
"value",
"name",
"stream_id",
}
if prop == "description":
assert extra_keys == {"rendered_description"}
assert isinstance(value, str)
elif prop == "invite_only":
assert extra_keys == {"history_public_to_subscribers", "is_web_public"}
assert isinstance(value, bool)
elif prop == "message_retention_days":
assert extra_keys == set()
if value is not None:
assert isinstance(value, int)
elif prop == "name":
assert extra_keys == set()
assert isinstance(value, str)
elif prop == "stream_post_policy":
assert extra_keys == set()
assert value in Stream.STREAM_POST_POLICY_TYPES
elif prop == "can_remove_subscribers_group_id":
assert extra_keys == set()
assert isinstance(value, int)
else:
raise AssertionError(f"Unknown property: {prop}")
submessage_event = event_dict_type(
required_keys=[
("type", Equals("submessage")),
("message_id", int),
("submessage_id", int),
("sender_id", int),
("msg_type", str),
("content", str),
]
)
check_submessage = make_checker(submessage_event)
single_subscription_type = DictType(
# force vertical
required_keys=subscription_fields,
)
subscription_add_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("add")),
("subscriptions", ListType(single_subscription_type)),
],
)
check_subscription_add = make_checker(subscription_add_event)
subscription_peer_add_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("peer_add")),
("user_ids", ListType(int)),
("stream_ids", ListType(int)),
]
)
check_subscription_peer_add = make_checker(subscription_peer_add_event)
subscription_peer_remove_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("peer_remove")),
("user_ids", ListType(int)),
("stream_ids", ListType(int)),
]
)
check_subscription_peer_remove = make_checker(subscription_peer_remove_event)
remove_sub_type = DictType(
required_keys=[
# We should eventually just return stream_id here.
("name", str),
("stream_id", int),
]
)
subscription_remove_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("remove")),
("subscriptions", ListType(remove_sub_type)),
]
)
check_subscription_remove = make_checker(subscription_remove_event)
# TODO: Have better validation for value_type; we don't have
# test_events tests for non-bool fields like `color`.
subscription_update_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("update")),
("property", str),
("stream_id", int),
("value", value_type),
]
)
_check_subscription_update = make_checker(subscription_update_event)
def check_subscription_update(
var_name: str, event: Dict[str, object], property: str, value: bool
) -> None:
_check_subscription_update(var_name, event)
assert event["property"] == property
assert event["value"] == value
typing_person_type = DictType(
required_keys=[
# we should eventually just send user_id
("email", str),
("user_id", int),
]
)
equals_private_or_stream = EnumType(
[
"private",
"stream",
]
)
typing_start_event = event_dict_type(
required_keys=[
("type", Equals("typing")),
("op", Equals("start")),
("message_type", equals_private_or_stream),
("sender", typing_person_type),
],
optional_keys=[
("recipients", ListType(typing_person_type)),
("stream_id", int),
("topic", str),
],
)
check_typing_start = make_checker(typing_start_event)
typing_stop_event = event_dict_type(
required_keys=[
("type", Equals("typing")),
("op", Equals("stop")),
("message_type", equals_private_or_stream),
("sender", typing_person_type),
],
optional_keys=[
("recipients", ListType(typing_person_type)),
("stream_id", int),
("topic", str),
],
)
check_typing_stop = make_checker(typing_stop_event)
update_display_settings_event = event_dict_type(
required_keys=[
("type", Equals("update_display_settings")),
("setting_name", str),
("setting", value_type),
("user", str),
],
optional_keys=[
# force vertical
("language_name", str),
],
)
_check_update_display_settings = make_checker(update_display_settings_event)
user_settings_update_event = event_dict_type(
required_keys=[
("type", Equals("user_settings")),
("op", Equals("update")),
("property", str),
("value", value_type),
],
optional_keys=[
# force vertical
("language_name", str),
],
)
_check_user_settings_update = make_checker(user_settings_update_event)
def check_update_display_settings(
var_name: str,
event: Dict[str, object],
) -> None:
"""
Display setting events have a "setting" field that
is more specifically typed according to the
UserProfile.property_types dictionary.
"""
_check_update_display_settings(var_name, event)
setting_name = event["setting_name"]
setting = event["setting"]
assert isinstance(setting_name, str)
if setting_name == "timezone":
assert isinstance(setting, str)
else:
setting_type = UserProfile.property_types[setting_name]
assert isinstance(setting, setting_type)
if setting_name == "default_language":
assert "language_name" in event
else:
assert "language_name" not in event
def check_user_settings_update(
var_name: str,
event: Dict[str, object],
) -> None:
_check_user_settings_update(var_name, event)
setting_name = event["property"]
value = event["value"]
assert isinstance(setting_name, str)
if setting_name == "timezone":
assert isinstance(value, str)
else:
setting_type = UserProfile.property_types[setting_name]
assert isinstance(value, setting_type)
if setting_name == "default_language":
assert "language_name" in event
else:
assert "language_name" not in event
update_global_notifications_event = event_dict_type(
required_keys=[
("type", Equals("update_global_notifications")),
("notification_name", str),
("setting", value_type),
("user", str),
]
)
_check_update_global_notifications = make_checker(update_global_notifications_event)
def check_update_global_notifications(
var_name: str,
event: Dict[str, object],
desired_val: Union[bool, int, str],
) -> None:
"""
See UserProfile.notification_settings_legacy for
more details.
"""
_check_update_global_notifications(var_name, event)
setting_name = event["notification_name"]
setting = event["setting"]
assert setting == desired_val
assert isinstance(setting_name, str)
setting_type = UserProfile.notification_settings_legacy[setting_name]
assert isinstance(setting, setting_type)
# user_id field is null for embedded variant of update_message
update_message_required_fields = [
("type", Equals("update_message")),
("user_id", OptionalType(int)),
("edit_timestamp", int),
("message_id", int),
("flags", ListType(str)),
("message_ids", ListType(int)),
("rendering_only", bool),
]
update_message_stream_fields: List[Tuple[str, object]] = [
("stream_id", int),
("stream_name", str),
]
update_message_content_fields: List[Tuple[str, object]] = [
("is_me_message", bool),
("orig_content", str),
("orig_rendered_content", str),
("prev_rendered_content_version", int),
]
update_message_content_or_embedded_data_fields: List[Tuple[str, object]] = [
("content", str),
("rendered_content", str),
]
update_message_topic_fields = [
(TOPIC_LINKS, ListType(_check_topic_links)),
(TOPIC_NAME, str),
]
update_message_change_stream_fields: List[Tuple[str, object]] = [
("new_stream_id", int),
]
update_message_change_stream_or_topic_fields: List[Tuple[str, object]] = [
(
"propagate_mode",
EnumType(
[
# The order here needs to match the OpenAPI definitions
"change_one",
"change_later",
"change_all",
]
),
),
(ORIG_TOPIC, str),
]
update_message_optional_fields = (
update_message_stream_fields
+ update_message_content_fields
+ update_message_content_or_embedded_data_fields
+ update_message_topic_fields
+ update_message_change_stream_fields
+ update_message_change_stream_or_topic_fields
)
# The schema here includes the embedded variant of update_message
update_message_event = event_dict_type(
required_keys=update_message_required_fields,
optional_keys=update_message_optional_fields,
)
_check_update_message = make_checker(update_message_event)
def check_update_message(
var_name: str,
event: Dict[str, object],
is_stream_message: bool,
has_content: bool,
has_topic: bool,
has_new_stream_id: bool,
is_embedded_update_only: bool,
) -> None:
# Always check the basic schema first.
_check_update_message(var_name, event)
actual_keys = set(event.keys())
expected_keys = {"id"}
expected_keys.update(tup[0] for tup in update_message_required_fields)
if is_stream_message:
expected_keys.update(tup[0] for tup in update_message_stream_fields)
if has_content:
expected_keys.update(tup[0] for tup in update_message_content_fields)
expected_keys.update(tup[0] for tup in update_message_content_or_embedded_data_fields)
if has_topic:
expected_keys.update(tup[0] for tup in update_message_topic_fields)
expected_keys.update(tup[0] for tup in update_message_change_stream_or_topic_fields)
if has_new_stream_id:
expected_keys.update(tup[0] for tup in update_message_change_stream_fields)
expected_keys.update(tup[0] for tup in update_message_change_stream_or_topic_fields)
if is_embedded_update_only:
expected_keys.update(tup[0] for tup in update_message_content_or_embedded_data_fields)
assert event["user_id"] is None
else:
assert isinstance(event["user_id"], int)
assert event["rendering_only"] == is_embedded_update_only
assert expected_keys == actual_keys
update_message_flags_add_event = event_dict_type(
required_keys=[
("type", Equals("update_message_flags")),
("op", Equals("add")),
("operation", Equals("add")),
("flag", str),
("messages", ListType(int)),
("all", bool),
]
)
check_update_message_flags_add = make_checker(update_message_flags_add_event)
update_message_flags_remove_event = event_dict_type(
required_keys=[
("type", Equals("update_message_flags")),
("op", Equals("remove")),
("operation", Equals("remove")),
("flag", EnumType(["read", "starred"])),
("messages", ListType(int)),
("all", bool),
],
optional_keys=[
(
"message_details",
StringDictType(
DictType(
required_keys=[
("type", EnumType(["private", "stream"])),
],
optional_keys=[
("mentioned", bool),
("user_ids", ListType(int)),
("stream_id", int),
("topic", str),
("unmuted_stream_msg", bool),
],
)
),
)
],
)
check_update_message_flags_remove = make_checker(update_message_flags_remove_event)
group_type = DictType(
required_keys=[
("id", int),
("name", str),
("members", ListType(int)),
("direct_subgroup_ids", ListType(int)),
("description", str),
("is_system_group", bool),
]
)
user_group_add_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("add")),
("group", group_type),
]
)
check_user_group_add = make_checker(user_group_add_event)
user_group_add_members_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("add_members")),
("group_id", int),
("user_ids", ListType(int)),
]
)
check_user_group_add_members = make_checker(user_group_add_members_event)
user_group_remove_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("remove")),
("group_id", int),
]
)
check_user_group_remove = make_checker(user_group_remove_event)
user_group_remove_members_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("remove_members")),
("group_id", int),
("user_ids", ListType(int)),
]
)
check_user_group_remove_members = make_checker(user_group_remove_members_event)
user_group_data_type = DictType(
required_keys=[],
optional_keys=[
# force vertical
("name", str),
("description", str),
],
)
user_group_update_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("update")),
("group_id", int),
("data", user_group_data_type),
]
)
_check_user_group_update = make_checker(user_group_update_event)
def check_user_group_update(var_name: str, event: Dict[str, object], field: str) -> None:
_check_user_group_update(var_name, event)
assert isinstance(event["data"], dict)
assert set(event["data"].keys()) == {field}
user_group_add_subgroups_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("add_subgroups")),
("group_id", int),
("direct_subgroup_ids", ListType(int)),
]
)
check_user_group_add_subgroups = make_checker(user_group_add_subgroups_event)
user_group_remove_subgroups_event = event_dict_type(
required_keys=[
("type", Equals("user_group")),
("op", Equals("remove_subgroups")),
("group_id", int),
("direct_subgroup_ids", ListType(int)),
]
)
check_user_group_remove_subgroups = make_checker(user_group_remove_subgroups_event)
user_status_event = event_dict_type(
required_keys=[
# force vertical
("type", Equals("user_status")),
("user_id", int),
],
optional_keys=[
# force vertical
("away", bool),
("status_text", str),
("emoji_name", str),
("emoji_code", str),
("reaction_type", str),
],
)
_check_user_status = make_checker(user_status_event)
def check_user_status(var_name: str, event: Dict[str, object], fields: Set[str]) -> None:
_check_user_status(var_name, event)
assert set(event.keys()) == {"id", "type", "user_id"} | fields