Files
zulip/zerver/views/streams.py
paxapy 18e43895ff streams.py: replace stream_subscribe_button with new #stream syntax.
Previously, we included a special subscribe button in new stream
notifications, but that had 2 problems:

(1) The subscribe button would render badly if the stream was renamed.
(2) There wasn't an easy way to look at the stream when deciding
whether to subscribe.

This fixes the second problem, but not really the first.
2016-12-15 22:43:14 -08:00

569 lines
26 KiB
Python

from __future__ import absolute_import
from typing import Any, Optional, Tuple, List, Set, Iterable, Mapping, Callable, Dict
from django.utils.translation import ugettext as _
from django.conf import settings
from django.db import transaction
from django.http import HttpRequest, HttpResponse
from zerver.lib.request import JsonableError, REQ, has_request_variables
from zerver.decorator import authenticated_json_post_view, \
authenticated_json_view, \
get_user_profile_by_email, require_realm_admin, to_non_negative_int
from zerver.lib.actions import bulk_remove_subscriptions, \
do_change_subscription_property, internal_prep_message, \
create_streams_if_needed, gather_subscriptions, subscribed_to_stream, \
bulk_add_subscriptions, do_send_messages, get_subscriber_emails, do_rename_stream, \
do_deactivate_stream, do_make_stream_public, do_add_default_stream, \
do_change_stream_description, do_get_streams, do_make_stream_private, \
do_remove_default_stream, get_topic_history_for_stream
from zerver.lib.response import json_success, json_error, json_response
from zerver.lib.validator import check_string, check_list, check_dict, \
check_bool, check_variable_type
from zerver.models import UserProfile, Stream, Subscription, \
Recipient, get_recipient, get_stream, bulk_get_streams, \
bulk_get_recipients, valid_stream_name, get_active_user_dicts_in_realm
from collections import defaultdict
import ujson
from six.moves import urllib
import six
from six import text_type
def is_active_subscriber(user_profile, recipient):
# type: (UserProfile, Recipient) -> bool
return Subscription.objects.filter(user_profile=user_profile,
recipient=recipient,
active=True).exists()
def list_to_streams(streams_raw, user_profile, autocreate=False):
# type: (Iterable[Mapping[str, Any]], UserProfile, Optional[bool]) -> Tuple[List[Stream], List[Stream]]
"""Converts list of dicts to a list of Streams, validating input in the process
For each stream name, we validate it to ensure it meets our
requirements for a proper stream name: that is, that it is shorter
than Stream.MAX_NAME_LENGTH characters and passes
valid_stream_name.
This function in autocreate mode should be atomic: either an exception will be raised
during a precheck, or all the streams specified will have been created if applicable.
@param streams_raw The list of stream dictionaries to process;
names should already be stripped of whitespace by the caller.
@param user_profile The user for whom we are retreiving the streams
@param autocreate Whether we should create streams if they don't already exist
"""
# Validate all streams, getting extant ones, then get-or-creating the rest.
stream_set = set(stream_dict["name"] for stream_dict in streams_raw)
for stream_name in stream_set:
# Stream names should already have been stripped by the
# caller, but it makes sense to verify anyway.
assert stream_name == stream_name.strip()
if len(stream_name) > Stream.MAX_NAME_LENGTH:
raise JsonableError(_("Stream name (%s) too long.") % (stream_name,))
if not valid_stream_name(stream_name):
raise JsonableError(_("Invalid stream name (%s).") % (stream_name,))
existing_streams = [] # type: List[Stream]
missing_stream_dicts = [] # type: List[Mapping[str, Any]]
existing_stream_map = bulk_get_streams(user_profile.realm, stream_set)
for stream_dict in streams_raw:
stream_name = stream_dict["name"]
stream = existing_stream_map.get(stream_name.lower())
if stream is None:
missing_stream_dicts.append(stream_dict)
else:
existing_streams.append(stream)
if len(missing_stream_dicts) == 0:
# This is the happy path for callers who expected all of these
# streams to exist already.
created_streams = [] # type: List[Stream]
else:
# autocreate=True path starts here
if not user_profile.can_create_streams():
raise JsonableError(_('User cannot create streams.'))
elif not autocreate:
raise JsonableError(_("Stream(s) (%s) do not exist") % ", ".join(
stream_dict["name"] for stream_dict in missing_stream_dicts))
# We already filtered out existing streams, so dup_streams
# will normally be an empty list below, but we protect against somebody
# else racing to create the same stream. (This is not an entirely
# paranoid approach, since often on Zulip two people will discuss
# creating a new stream, and both people eagerly do it.)
created_streams, dup_streams = create_streams_if_needed(realm=user_profile.realm,
stream_dicts=missing_stream_dicts)
existing_streams += dup_streams
return existing_streams, created_streams
class PrincipalError(JsonableError):
def __init__(self, principal, status_code=403):
# type: (text_type, int) -> None
self.principal = principal # type: text_type
self.status_code = status_code # type: int
def to_json_error_msg(self):
# type: () -> text_type
return ("User not authorized to execute queries on behalf of '%s'"
% (self.principal,))
def principal_to_user_profile(agent, principal):
# type: (UserProfile, text_type) -> UserProfile
principal_doesnt_exist = False
try:
principal_user_profile = get_user_profile_by_email(principal)
except UserProfile.DoesNotExist:
principal_doesnt_exist = True
if (principal_doesnt_exist
or agent.realm != principal_user_profile.realm):
# We have to make sure we don't leak information about which users
# are registered for Zulip in a different realm. We could do
# something a little more clever and check the domain part of the
# principal to maybe give a better error message
raise PrincipalError(principal)
return principal_user_profile
@require_realm_admin
def deactivate_stream_backend(request, user_profile, stream_name):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
target = get_stream(stream_name, user_profile.realm)
if not target:
return json_error(_('No such stream name'))
if target.invite_only and not subscribed_to_stream(user_profile, target):
return json_error(_('Cannot administer invite-only streams this way'))
do_deactivate_stream(target)
return json_success()
@require_realm_admin
@has_request_variables
def add_default_stream(request, user_profile, stream_name=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
do_add_default_stream(user_profile.realm, stream_name)
return json_success()
@require_realm_admin
@has_request_variables
def remove_default_stream(request, user_profile, stream_name=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
do_remove_default_stream(user_profile.realm, stream_name)
return json_success()
@authenticated_json_post_view
@require_realm_admin
@has_request_variables
def json_make_stream_public(request, user_profile, stream_name=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
do_make_stream_public(user_profile, user_profile.realm, stream_name)
return json_success()
@authenticated_json_post_view
@require_realm_admin
@has_request_variables
def json_make_stream_private(request, user_profile, stream_name=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
do_make_stream_private(user_profile.realm, stream_name)
return json_success()
@require_realm_admin
@has_request_variables
def update_stream_backend(request, user_profile, stream_name,
description=REQ(validator=check_string, default=None),
new_name=REQ(validator=check_string, default=None)):
# type: (HttpRequest, UserProfile, text_type, Optional[text_type], Optional[text_type]) -> HttpResponse
if description is not None:
do_change_stream_description(user_profile.realm, stream_name, description)
if stream_name is not None and new_name is not None:
do_rename_stream(user_profile.realm, stream_name, new_name)
return json_success()
def list_subscriptions_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
return json_success({"subscriptions": gather_subscriptions(user_profile)[0]})
FuncKwargPair = Tuple[Callable[..., HttpResponse], Dict[str, Iterable[Any]]]
@has_request_variables
def update_subscriptions_backend(request, user_profile,
delete=REQ(validator=check_list(check_string), default=[]),
add=REQ(validator=check_list(check_dict([('name', check_string)])), default=[])):
# type: (HttpRequest, UserProfile, Iterable[text_type], Iterable[Mapping[str, Any]]) -> HttpResponse
if not add and not delete:
return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))
method_kwarg_pairs = [
(add_subscriptions_backend, dict(streams_raw=add)),
(remove_subscriptions_backend, dict(streams_raw=delete))
] # type: List[FuncKwargPair]
return compose_views(request, user_profile, method_kwarg_pairs)
def compose_views(request, user_profile, method_kwarg_pairs):
# type: (HttpRequest, UserProfile, List[FuncKwargPair]) -> HttpResponse
'''
This takes a series of view methods from method_kwarg_pairs and calls
them in sequence, and it smushes all the json results into a single
response when everything goes right. (This helps clients avoid extra
latency hops.) It rolls back the transaction when things go wrong in
any one of the composed methods.
TODO: Move this a utils-like module if we end up using it more widely.
'''
json_dict = {} # type: Dict[str, Any]
with transaction.atomic():
for method, kwargs in method_kwarg_pairs:
response = method(request, user_profile, **kwargs)
if response.status_code != 200:
raise JsonableError(response.content)
json_dict.update(ujson.loads(response.content))
return json_success(json_dict)
@authenticated_json_post_view
def json_remove_subscriptions(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
return remove_subscriptions_backend(request, user_profile)
@has_request_variables
def remove_subscriptions_backend(request, user_profile,
streams_raw = REQ("subscriptions", validator=check_list(check_string)),
principals = REQ(validator=check_list(check_string), default=None)):
# type: (HttpRequest, UserProfile, Iterable[text_type], Optional[Iterable[text_type]]) -> HttpResponse
removing_someone_else = principals and \
set(principals) != set((user_profile.email,))
if removing_someone_else and not user_profile.is_realm_admin:
# You can only unsubscribe other people from a stream if you are a realm
# admin.
return json_error(_("This action requires administrative rights"))
streams_as_dict = []
for stream_name in streams_raw:
streams_as_dict.append({"name": stream_name.strip()})
streams, __ = list_to_streams(streams_as_dict, user_profile)
for stream in streams:
if removing_someone_else and stream.invite_only and \
not subscribed_to_stream(user_profile, stream):
# Even as an admin, you can't remove other people from an
# invite-only stream you're not on.
return json_error(_("Cannot administer invite-only streams this way"))
if principals:
people_to_unsub = set(principal_to_user_profile(
user_profile, principal) for principal in principals)
else:
people_to_unsub = set([user_profile])
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[text_type]]
(removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams)
for (subscriber, stream) in removed:
result["removed"].append(stream.name)
for (subscriber, stream) in not_subscribed:
result["not_subscribed"].append(stream.name)
return json_success(result)
def filter_stream_authorization(user_profile, streams):
# type: (UserProfile, Iterable[Stream]) -> Tuple[List[Stream], List[Stream]]
streams_subscribed = set() # type: Set[int]
recipients_map = bulk_get_recipients(Recipient.STREAM, [stream.id for stream in streams])
subs = Subscription.objects.filter(user_profile=user_profile,
recipient__in=list(recipients_map.values()),
active=True)
for sub in subs:
streams_subscribed.add(sub.recipient.type_id)
unauthorized_streams = [] # type: List[Stream]
for stream in streams:
# The user is authorized for his own streams
if stream.id in streams_subscribed:
continue
# The user is not authorized for invite_only streams
if stream.invite_only:
unauthorized_streams.append(stream)
authorized_streams = [stream for stream in streams if
stream.id not in set(stream.id for stream in unauthorized_streams)]
return authorized_streams, unauthorized_streams
@has_request_variables
def add_subscriptions_backend(request, user_profile,
streams_raw = REQ("subscriptions",
validator=check_list(check_dict([('name', check_string)]))),
invite_only = REQ(validator=check_bool, default=False),
announce = REQ(validator=check_bool, default=False),
principals = REQ(validator=check_list(check_string), default=None),
authorization_errors_fatal = REQ(validator=check_bool, default=True)):
# type: (HttpRequest, UserProfile, Iterable[Mapping[str, text_type]], bool, bool, Optional[List[text_type]], bool) -> HttpResponse
stream_dicts = []
for stream_dict in streams_raw:
stream_dict_copy = {} # type: Dict[str, Any]
for field in stream_dict:
stream_dict_copy[field] = stream_dict[field]
# Strip the stream name here.
stream_dict_copy['name'] = stream_dict_copy['name'].strip()
stream_dict_copy["invite_only"] = invite_only
stream_dicts.append(stream_dict_copy)
# Validation of the streams arguments, including enforcement of
# can_create_streams policy and valid_stream_name policy is inside
# list_to_streams.
existing_streams, created_streams = \
list_to_streams(stream_dicts, user_profile, autocreate=True)
authorized_streams, unauthorized_streams = \
filter_stream_authorization(user_profile, existing_streams)
if len(unauthorized_streams) > 0 and authorization_errors_fatal:
return json_error(_("Unable to access stream (%s).") % unauthorized_streams[0].name)
# Newly created streams are also authorized for the creator
streams = authorized_streams + created_streams
if principals is not None:
if user_profile.realm.is_zephyr_mirror_realm and not all(stream.invite_only for stream in streams):
return json_error(_("You can only invite other Zephyr mirroring users to invite-only streams."))
subscribers = set(principal_to_user_profile(user_profile, principal) for principal in principals)
else:
subscribers = set([user_profile])
(subscribed, already_subscribed) = bulk_add_subscriptions(streams, subscribers)
result = dict(subscribed=defaultdict(list), already_subscribed=defaultdict(list)) # type: Dict[str, Any]
for (subscriber, stream) in subscribed:
result["subscribed"][subscriber.email].append(stream.name)
for (subscriber, stream) in already_subscribed:
result["already_subscribed"][subscriber.email].append(stream.name)
private_streams = dict((stream.name, stream.invite_only) for stream in streams)
bots = dict((subscriber.email, subscriber.is_bot) for subscriber in subscribers)
# Inform the user if someone else subscribed them to stuff,
# or if a new stream was created with the "announce" option.
notifications = []
if principals and result["subscribed"]:
for email, subscriptions in six.iteritems(result["subscribed"]):
if email == user_profile.email:
# Don't send a Zulip if you invited yourself.
continue
if bots[email]:
# Don't send invitation Zulips to bots
continue
if len(subscriptions) == 1:
msg = ("Hi there! We thought you'd like to know that %s just "
"subscribed you to the%s stream #**%s**."
% (user_profile.full_name,
" **invite-only**" if private_streams[subscriptions[0]] else "",
subscriptions[0],
))
else:
msg = ("Hi there! We thought you'd like to know that %s just "
"subscribed you to the following streams: \n\n"
% (user_profile.full_name,))
for stream in subscriptions:
msg += "* #**%s**%s\n" % (
stream,
" (**invite-only**)" if private_streams[stream] else "")
if len([s for s in subscriptions if not private_streams[s]]) > 0:
msg += "\nYou can see historical content on a non-invite-only stream by narrowing to it."
notifications.append(internal_prep_message(settings.NOTIFICATION_BOT,
"private", email, "", msg))
if announce and len(created_streams) > 0:
notifications_stream = user_profile.realm.notifications_stream
if notifications_stream is not None:
if len(created_streams) > 1:
stream_msg = "the following streams: %s" % (", ".join('#**%s**' % s.name for s in created_streams))
else:
stream_msg = "a new stream #**%s**." % created_streams[0].name
msg = ("%s just created %s" % (user_profile.full_name, stream_msg))
notifications.append(
internal_prep_message(settings.NOTIFICATION_BOT,
"stream",
notifications_stream.name, "Streams", msg,
realm=notifications_stream.realm))
else:
msg = ("Hi there! %s just created a new stream #**%s**."
% (user_profile.full_name, created_streams[0].name))
for realm_user_dict in get_active_user_dicts_in_realm(user_profile.realm):
# Don't announce to yourself or to people you explicitly added
# (who will get the notification above instead).
if realm_user_dict['email'] in principals or realm_user_dict['email'] == user_profile.email:
continue
notifications.append(internal_prep_message(settings.NOTIFICATION_BOT,
"private",
realm_user_dict['email'], "", msg))
if len(notifications) > 0:
do_send_messages(notifications)
result["subscribed"] = dict(result["subscribed"])
result["already_subscribed"] = dict(result["already_subscribed"])
if not authorization_errors_fatal:
result["unauthorized"] = [stream.name for stream in unauthorized_streams]
return json_success(result)
@has_request_variables
def get_subscribers_backend(request, user_profile, stream_name=REQ('stream')):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
stream = get_stream(stream_name, user_profile.realm)
if stream is None:
raise JsonableError(_("Stream does not exist: %s") % (stream_name,))
subscribers = get_subscriber_emails(stream, user_profile)
return json_success({'subscribers': subscribers})
@authenticated_json_post_view
def json_get_subscribers(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
return get_subscribers_backend(request, user_profile)
# By default, lists all streams that the user has access to --
# i.e. public streams plus invite-only streams that the user is on
@has_request_variables
def get_streams_backend(request, user_profile,
include_public=REQ(validator=check_bool, default=True),
include_subscribed=REQ(validator=check_bool, default=True),
include_all_active=REQ(validator=check_bool, default=False),
include_default=REQ(validator=check_bool, default=False)):
# type: (HttpRequest, UserProfile, bool, bool, bool, bool) -> HttpResponse
streams = do_get_streams(user_profile, include_public=include_public,
include_subscribed=include_subscribed,
include_all_active=include_all_active,
include_default=include_default)
return json_success({"streams": streams})
@has_request_variables
def get_topics_backend(request, user_profile,
stream_id=REQ(converter=to_non_negative_int)):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
try:
stream = Stream.objects.get(pk=stream_id)
except Stream.DoesNotExist:
return json_error(_("Invalid stream id"))
if stream.realm_id != user_profile.realm_id:
return json_error(_("Invalid stream id"))
recipient = get_recipient(Recipient.STREAM, stream.id)
if not stream.is_public():
if not is_active_subscriber(user_profile=user_profile,
recipient=recipient):
return json_error(_("Invalid stream id"))
result = get_topic_history_for_stream(
user_profile=user_profile,
recipient=recipient,
)
# Our data structure here is a list of tuples of
# (topic name, unread count), and it's reverse chronological,
# so the most recent topic is the first element of the list.
return json_success(dict(topics=result))
@authenticated_json_post_view
@has_request_variables
def json_stream_exists(request, user_profile, stream=REQ(),
autosubscribe=REQ(default=False)):
# type: (HttpRequest, UserProfile, text_type, bool) -> HttpResponse
return stream_exists_backend(request, user_profile, stream, autosubscribe)
def stream_exists_backend(request, user_profile, stream_name, autosubscribe):
# type: (HttpRequest, UserProfile, text_type, bool) -> HttpResponse
if not valid_stream_name(stream_name):
return json_error(_("Invalid characters in stream name"))
stream = get_stream(stream_name, user_profile.realm)
result = {"exists": bool(stream)}
if stream is not None:
recipient = get_recipient(Recipient.STREAM, stream.id)
if autosubscribe:
bulk_add_subscriptions([stream], [user_profile])
result["subscribed"] = is_active_subscriber(
user_profile=user_profile,
recipient=recipient)
return json_success(result) # results are ignored for HEAD requests
return json_response(data=result, status=404)
def get_subscription_or_die(stream_name, user_profile):
# type: (text_type, UserProfile) -> Subscription
stream = get_stream(stream_name, user_profile.realm)
if not stream:
raise JsonableError(_("Invalid stream %s") % (stream_name,))
recipient = get_recipient(Recipient.STREAM, stream.id)
subscription = Subscription.objects.filter(user_profile=user_profile,
recipient=recipient, active=True)
if not subscription.exists():
raise JsonableError(_("Not subscribed to stream %s") % (stream_name,))
return subscription
@authenticated_json_view
@has_request_variables
def json_subscription_property(request, user_profile, subscription_data=REQ(
validator=check_list(
check_dict([("stream", check_string),
("property", check_string),
("value", check_variable_type(
[check_string, check_bool]))])))):
# type: (HttpRequest, UserProfile, List[Dict[str, Any]]) -> HttpResponse
"""
This is the entry point to changing subscription properties. This
is a bulk endpoint: requestors always provide a subscription_data
list containing dictionaries for each stream of interest.
Requests are of the form:
[{"stream": "devel", "property": "in_home_view", "value": False},
{"stream": "devel", "property": "color", "value": "#c2c2c2"}]
"""
if request.method != "POST":
return json_error(_("Invalid verb"))
property_converters = {"color": check_string, "in_home_view": check_bool,
"desktop_notifications": check_bool,
"audible_notifications": check_bool,
"pin_to_top": check_bool}
response_data = []
for change in subscription_data:
stream_name = change["stream"]
property = change["property"]
value = change["value"]
if property not in property_converters:
return json_error(_("Unknown subscription property: %s") % (property,))
sub = get_subscription_or_die(stream_name, user_profile)[0]
property_conversion = property_converters[property](property, value)
if property_conversion:
return json_error(property_conversion)
do_change_subscription_property(user_profile, sub, stream_name,
property, value)
response_data.append({'stream': stream_name,
'property': property,
'value': value})
return json_success({"subscription_data": response_data})