mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			5.x-user-s
			...
			5.x
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					cb8ba561d9 | 
@@ -45,13 +45,6 @@ location /api/v1/events {
 | 
			
		||||
    include /etc/nginx/zulip-include/proxy_longpolling;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
# Handle X-Accel-Redirect from Tornado to Tornado
 | 
			
		||||
location ~ ^/tornado/(\d+)(/.*)$ {
 | 
			
		||||
    internal;
 | 
			
		||||
    proxy_pass http://tornado$1$2$is_args$args;
 | 
			
		||||
    include /etc/nginx/zulip-include/proxy_longpolling;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
# Send everything else to Django via uWSGI
 | 
			
		||||
location / {
 | 
			
		||||
    include uwsgi_params;
 | 
			
		||||
 
 | 
			
		||||
@@ -5,22 +5,15 @@ class zulip::tornado_sharding {
 | 
			
		||||
  # with the correct default content for the "only one shard" setup. For this
 | 
			
		||||
  # reason they use "replace => false", because the files are managed by
 | 
			
		||||
  # the sharding script afterwards and Puppet shouldn't overwrite them.
 | 
			
		||||
  file { '/etc/zulip/nginx_sharding_map.conf':
 | 
			
		||||
  file { '/etc/zulip/nginx_sharding.conf':
 | 
			
		||||
    ensure  => file,
 | 
			
		||||
    owner   => 'root',
 | 
			
		||||
    group   => 'root',
 | 
			
		||||
    mode    => '0644',
 | 
			
		||||
    notify  => Service['nginx'],
 | 
			
		||||
    content => @(EOT),
 | 
			
		||||
      map "" $tornado_server {
 | 
			
		||||
          default http://tornado;
 | 
			
		||||
      }
 | 
			
		||||
      | EOT
 | 
			
		||||
    content => "set \$tornado_server http://tornado;\n",
 | 
			
		||||
    replace => false,
 | 
			
		||||
  }
 | 
			
		||||
  file { '/etc/zulip/nginx_sharding.conf':
 | 
			
		||||
    ensure => absent,
 | 
			
		||||
  }
 | 
			
		||||
  file { '/etc/zulip/sharding.json':
 | 
			
		||||
    ensure  => file,
 | 
			
		||||
    require => User['zulip'],
 | 
			
		||||
@@ -36,15 +29,14 @@ class zulip::tornado_sharding {
 | 
			
		||||
  exec { 'stage_updated_sharding':
 | 
			
		||||
    command   => "${::zulip_scripts_path}/lib/sharding.py",
 | 
			
		||||
    onlyif    => "${::zulip_scripts_path}/lib/sharding.py --errors-ok",
 | 
			
		||||
    require   => [File['/etc/zulip/nginx_sharding_map.conf'], File['/etc/zulip/sharding.json']],
 | 
			
		||||
    require   => [File['/etc/zulip/nginx_sharding.conf'], File['/etc/zulip/sharding.json']],
 | 
			
		||||
    logoutput => true,
 | 
			
		||||
    loglevel  => 'warning',
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  # The ports of Tornado processes to run on the server, computed from
 | 
			
		||||
  # the zulip.conf configuration. Default is just port 9800.
 | 
			
		||||
  $tornado_groups = zulipconf_keys('tornado_sharding').map |$key| { $key.regsubst(/_regex$/, '').split('_') }.unique
 | 
			
		||||
  $tornado_ports = $tornado_groups.flatten.unique
 | 
			
		||||
  # The ports of Tornado processes to run on the server; defaults to
 | 
			
		||||
  # 9800.
 | 
			
		||||
  $tornado_ports = zulipconf_keys('tornado_sharding')
 | 
			
		||||
 | 
			
		||||
  file { '/etc/nginx/zulip-include/tornado-upstreams':
 | 
			
		||||
    require => [Package[$zulip::common::nginx], Exec['stage_updated_sharding']],
 | 
			
		||||
 
 | 
			
		||||
@@ -5,17 +5,6 @@ upstream tornado<%= port %> {
 | 
			
		||||
    keepalive 10000;
 | 
			
		||||
}
 | 
			
		||||
<% end -%>
 | 
			
		||||
<% @tornado_groups.each do |group| -%>
 | 
			
		||||
<% if group.length > 1 -%>
 | 
			
		||||
upstream tornado<%= group.join('_') %> {
 | 
			
		||||
    random;
 | 
			
		||||
<% group.each do |port| -%>
 | 
			
		||||
    server 127.0.0.1:<%= port %>;
 | 
			
		||||
<% end -%>
 | 
			
		||||
    keepalive 10000;
 | 
			
		||||
}
 | 
			
		||||
<% end -%>
 | 
			
		||||
<% end -%>
 | 
			
		||||
<% else -%>
 | 
			
		||||
upstream tornado {
 | 
			
		||||
    server 127.0.0.1:9800;
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,6 @@ server {
 | 
			
		||||
<% end -%>
 | 
			
		||||
 | 
			
		||||
include /etc/nginx/zulip-include/upstreams;
 | 
			
		||||
include /etc/zulip/nginx_sharding_map.conf;
 | 
			
		||||
 | 
			
		||||
server {
 | 
			
		||||
<% if @nginx_http_only -%>
 | 
			
		||||
@@ -31,6 +30,7 @@ server {
 | 
			
		||||
        alias /home/zulip/local-static;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    include /etc/zulip/nginx_sharding.conf;
 | 
			
		||||
    include /etc/nginx/zulip-include/certbot;
 | 
			
		||||
    include /etc/nginx/zulip-include/app;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,4 @@
 | 
			
		||||
include /etc/nginx/zulip-include/upstreams;
 | 
			
		||||
include /etc/zulip/nginx_sharding_map.conf;
 | 
			
		||||
 | 
			
		||||
server {
 | 
			
		||||
    listen 443 ssl http2;
 | 
			
		||||
@@ -16,5 +15,6 @@ server {
 | 
			
		||||
 | 
			
		||||
    server_name zulipchat.com *.zulipchat.com;
 | 
			
		||||
 | 
			
		||||
    include /etc/zulip/nginx_sharding.conf;
 | 
			
		||||
    include /etc/nginx/zulip-include/app;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,6 @@ server {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
include /etc/nginx/zulip-include/upstreams;
 | 
			
		||||
include /etc/zulip/nginx_sharding_map.conf;
 | 
			
		||||
 | 
			
		||||
server {
 | 
			
		||||
    listen 443 ssl http2;
 | 
			
		||||
@@ -23,5 +22,6 @@ server {
 | 
			
		||||
 | 
			
		||||
    server_name zulipstaging.com;
 | 
			
		||||
 | 
			
		||||
    include /etc/zulip/nginx_sharding.conf;
 | 
			
		||||
    include /etc/nginx/zulip-include/app;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -5,7 +5,7 @@ import json
 | 
			
		||||
import os
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
from typing import Dict, List, Tuple, Union
 | 
			
		||||
from typing import Any, Dict
 | 
			
		||||
 | 
			
		||||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
			
		||||
sys.path.append(BASE_DIR)
 | 
			
		||||
@@ -16,13 +16,17 @@ setup_path()
 | 
			
		||||
from scripts.lib.zulip_tools import get_config_file, get_tornado_ports
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def nginx_quote(s: str) -> str:
 | 
			
		||||
    return '"' + s.replace("\\", "\\\\").replace('"', '\\"') + '"'
 | 
			
		||||
def write_realm_nginx_config_line(f: Any, host: str, port: str) -> None:
 | 
			
		||||
    f.write(
 | 
			
		||||
        f"""if ($host = '{host}') {{
 | 
			
		||||
    set $tornado_server http://tornado{port};
 | 
			
		||||
}}\n"""
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Basic system to do Tornado sharding.  Writes two output .tmp files that need
 | 
			
		||||
# to be renamed to the following files to finalize the changes:
 | 
			
		||||
# * /etc/zulip/nginx_sharding_map.conf; nginx needs to be reloaded after changing.
 | 
			
		||||
# * /etc/zulip/nginx_sharding.conf; nginx needs to be reloaded after changing.
 | 
			
		||||
# * /etc/zulip/sharding.json; supervisor Django process needs to be reloaded
 | 
			
		||||
# after changing.  TODO: We can probably make this live-reload by statting the file.
 | 
			
		||||
#
 | 
			
		||||
@@ -31,52 +35,41 @@ def write_updated_configs() -> None:
 | 
			
		||||
    config_file = get_config_file()
 | 
			
		||||
    ports = get_tornado_ports(config_file)
 | 
			
		||||
 | 
			
		||||
    expected_ports = list(range(9800, ports[-1] + 1))
 | 
			
		||||
    assert ports == expected_ports, f"ports ({ports}) must be contiguous, starting with 9800"
 | 
			
		||||
    expected_ports = list(range(9800, max(ports) + 1))
 | 
			
		||||
    assert (
 | 
			
		||||
        sorted(ports) == expected_ports
 | 
			
		||||
    ), f"ports ({sorted(ports)}) must be contiguous, starting with 9800"
 | 
			
		||||
 | 
			
		||||
    with open("/etc/zulip/nginx_sharding_map.conf.tmp", "w") as nginx_sharding_conf_f, open(
 | 
			
		||||
    with open("/etc/zulip/nginx_sharding.conf.tmp", "w") as nginx_sharding_conf_f, open(
 | 
			
		||||
        "/etc/zulip/sharding.json.tmp", "w"
 | 
			
		||||
    ) as sharding_json_f:
 | 
			
		||||
 | 
			
		||||
        if len(ports) == 1:
 | 
			
		||||
            nginx_sharding_conf_f.write('map "" $tornado_server {\n')
 | 
			
		||||
            nginx_sharding_conf_f.write("    default http://tornado;\n")
 | 
			
		||||
            nginx_sharding_conf_f.write("}\n")
 | 
			
		||||
            nginx_sharding_conf_f.write("set $tornado_server http://tornado;\n")
 | 
			
		||||
            sharding_json_f.write("{}\n")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        nginx_sharding_conf_f.write("map $http_host $tornado_server {\n")
 | 
			
		||||
        nginx_sharding_conf_f.write("    default http://tornado9800;\n")
 | 
			
		||||
        shard_map: Dict[str, Union[int, List[int]]] = {}
 | 
			
		||||
        shard_regexes: List[Tuple[str, Union[int, List[int]]]] = []
 | 
			
		||||
        nginx_sharding_conf_f.write("set $tornado_server http://tornado9800;\n")
 | 
			
		||||
        shard_map: Dict[str, int] = {}
 | 
			
		||||
        external_host = subprocess.check_output(
 | 
			
		||||
            [os.path.join(BASE_DIR, "scripts/get-django-setting"), "EXTERNAL_HOST"],
 | 
			
		||||
            text=True,
 | 
			
		||||
        ).strip()
 | 
			
		||||
        for key, shards in config_file["tornado_sharding"].items():
 | 
			
		||||
            if key.endswith("_regex"):
 | 
			
		||||
                ports = [int(port) for port in key[: -len("_regex")].split("_")]
 | 
			
		||||
                shard_regexes.append((shards, ports[0] if len(ports) == 1 else ports))
 | 
			
		||||
                nginx_sharding_conf_f.write(
 | 
			
		||||
                    f"    {nginx_quote('~*' + shards)} http://tornado{'_'.join(map(str, ports))};\n"
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                ports = [int(port) for port in key.split("_")]
 | 
			
		||||
                for shard in shards.split():
 | 
			
		||||
        for port in config_file["tornado_sharding"]:
 | 
			
		||||
            shards = config_file["tornado_sharding"][port].strip()
 | 
			
		||||
 | 
			
		||||
            if shards:
 | 
			
		||||
                for shard in shards.split(" "):
 | 
			
		||||
                    if "." in shard:
 | 
			
		||||
                        host = shard
 | 
			
		||||
                    else:
 | 
			
		||||
                        host = f"{shard}.{external_host}"
 | 
			
		||||
                    assert host not in shard_map, f"host {host} duplicated"
 | 
			
		||||
                    shard_map[host] = ports[0] if len(ports) == 1 else ports
 | 
			
		||||
                    nginx_sharding_conf_f.write(
 | 
			
		||||
                        f"    {nginx_quote(host)} http://tornado{'_'.join(map(str, ports))};\n"
 | 
			
		||||
                    )
 | 
			
		||||
                    shard_map[host] = int(port)
 | 
			
		||||
                    write_realm_nginx_config_line(nginx_sharding_conf_f, host, port)
 | 
			
		||||
            nginx_sharding_conf_f.write("\n")
 | 
			
		||||
        nginx_sharding_conf_f.write("}\n")
 | 
			
		||||
 | 
			
		||||
        data = {"shard_map": shard_map, "shard_regexes": shard_regexes}
 | 
			
		||||
        sharding_json_f.write(json.dumps(data) + "\n")
 | 
			
		||||
        sharding_json_f.write(json.dumps(shard_map) + "\n")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
parser = argparse.ArgumentParser(
 | 
			
		||||
@@ -90,7 +83,7 @@ parser.add_argument(
 | 
			
		||||
options = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
config_file_path = "/etc/zulip"
 | 
			
		||||
base_files = ["nginx_sharding_map.conf", "sharding.json"]
 | 
			
		||||
base_files = ["nginx_sharding.conf", "sharding.json"]
 | 
			
		||||
full_real_paths = [f"{config_file_path}/{filename}" for filename in base_files]
 | 
			
		||||
full_new_paths = [f"{filename}.tmp" for filename in full_real_paths]
 | 
			
		||||
try:
 | 
			
		||||
 
 | 
			
		||||
@@ -595,13 +595,7 @@ def run_psql_as_postgres(
 | 
			
		||||
def get_tornado_ports(config_file: configparser.RawConfigParser) -> List[int]:
 | 
			
		||||
    ports = []
 | 
			
		||||
    if config_file.has_section("tornado_sharding"):
 | 
			
		||||
        ports = sorted(
 | 
			
		||||
            {
 | 
			
		||||
                int(port)
 | 
			
		||||
                for key in config_file.options("tornado_sharding")
 | 
			
		||||
                for port in (key[: -len("_regex")] if key.endswith("_regex") else key).split("_")
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        ports = [int(port) for port in config_file.options("tornado_sharding")]
 | 
			
		||||
    if not ports:
 | 
			
		||||
        ports = [9800]
 | 
			
		||||
    return ports
 | 
			
		||||
 
 | 
			
		||||
@@ -7,16 +7,16 @@ set -e
 | 
			
		||||
SUPPRESS_SHARDING_NOTICE=1 "$(dirname "$0")/zulip-puppet-apply" -f
 | 
			
		||||
 | 
			
		||||
# Verify, before we move them into place
 | 
			
		||||
if ! [ -e /etc/zulip/nginx_sharding_map.conf.tmp ] || ! [ -e /etc/zulip/sharding.json.tmp ]; then
 | 
			
		||||
if ! [ -e /etc/zulip/nginx_sharding.conf.tmp ] || ! [ -e /etc/zulip/sharding.json.tmp ]; then
 | 
			
		||||
    echo "No sharding updates found to apply."
 | 
			
		||||
    exit 1
 | 
			
		||||
fi
 | 
			
		||||
 | 
			
		||||
chown root:root /etc/zulip/nginx_sharding_map.conf.tmp
 | 
			
		||||
chmod 644 /etc/zulip/nginx_sharding_map.conf.tmp
 | 
			
		||||
chown root:root /etc/zulip/nginx_sharding.conf.tmp
 | 
			
		||||
chmod 644 /etc/zulip/nginx_sharding.conf.tmp
 | 
			
		||||
chown zulip:zulip /etc/zulip/sharding.json.tmp
 | 
			
		||||
chmod 644 /etc/zulip/sharding.json.tmp
 | 
			
		||||
mv /etc/zulip/nginx_sharding_map.conf.tmp /etc/zulip/nginx_sharding_map.conf
 | 
			
		||||
mv /etc/zulip/nginx_sharding.conf.tmp /etc/zulip/nginx_sharding.conf
 | 
			
		||||
mv /etc/zulip/sharding.json.tmp /etc/zulip/sharding.json
 | 
			
		||||
 | 
			
		||||
# In the ordering of operations below, the crucial detail is that
 | 
			
		||||
 
 | 
			
		||||
@@ -19,7 +19,6 @@ if settings.PRODUCTION:
 | 
			
		||||
from zerver.lib.async_utils import NoAutoCreateEventLoopPolicy
 | 
			
		||||
from zerver.lib.debug import interactive_debug_listen
 | 
			
		||||
from zerver.tornado.application import create_tornado_application, setup_tornado_rabbitmq
 | 
			
		||||
from zerver.tornado.descriptors import set_current_port
 | 
			
		||||
from zerver.tornado.event_queue import (
 | 
			
		||||
    add_client_gc_hook,
 | 
			
		||||
    dump_event_queues,
 | 
			
		||||
@@ -92,7 +91,6 @@ class Command(BaseCommand):
 | 
			
		||||
                )
 | 
			
		||||
                await sync_to_async(add_signal_handlers, thread_sensitive=True)()
 | 
			
		||||
 | 
			
		||||
                set_current_port(port)
 | 
			
		||||
                translation.activate(settings.LANGUAGE_CODE)
 | 
			
		||||
 | 
			
		||||
                # We pass display_num_errors=False, since Django will
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,7 @@
 | 
			
		||||
from django.db import migrations
 | 
			
		||||
from django.db.backends.postgresql.schema import DatabaseSchemaEditor
 | 
			
		||||
from django.db.migrations.state import StateApps
 | 
			
		||||
from django.db.models import OuterRef, Subquery
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def set_emoji_author(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None:
 | 
			
		||||
@@ -13,20 +14,15 @@ def set_emoji_author(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> No
 | 
			
		||||
    UserProfile = apps.get_model("zerver", "UserProfile")
 | 
			
		||||
    ROLE_REALM_OWNER = 100
 | 
			
		||||
 | 
			
		||||
    realm_emoji_to_update = []
 | 
			
		||||
    for realm_emoji in RealmEmoji.objects.all():
 | 
			
		||||
        if realm_emoji.author_id is None:
 | 
			
		||||
            user_profile = (
 | 
			
		||||
                UserProfile.objects.filter(
 | 
			
		||||
                    realm_id=realm_emoji.realm_id, is_active=True, role=ROLE_REALM_OWNER
 | 
			
		||||
                )
 | 
			
		||||
                .order_by("id")
 | 
			
		||||
                .first()
 | 
			
		||||
    RealmEmoji.objects.filter(author=None).update(
 | 
			
		||||
        author=Subquery(
 | 
			
		||||
            UserProfile.objects.filter(
 | 
			
		||||
                realm=OuterRef("realm"), is_active=True, role=ROLE_REALM_OWNER
 | 
			
		||||
            )
 | 
			
		||||
            realm_emoji.author_id = user_profile.id
 | 
			
		||||
            realm_emoji_to_update.append(realm_emoji)
 | 
			
		||||
 | 
			
		||||
    RealmEmoji.objects.bulk_update(realm_emoji_to_update, ["author_id"])
 | 
			
		||||
            .order_by("id")[:1]
 | 
			
		||||
            .values("pk")
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # Previously, this also pushed `reupload_realm_emoji` events onto
 | 
			
		||||
    # the `deferred_work` queue; however,
 | 
			
		||||
 
 | 
			
		||||
@@ -13,8 +13,8 @@ from zerver.lib.user_groups import create_user_group, remove_user_from_user_grou
 | 
			
		||||
from zerver.models import Recipient, Stream, Subscription, UserProfile, get_stream
 | 
			
		||||
from zerver.tornado.event_queue import (
 | 
			
		||||
    ClientDescriptor,
 | 
			
		||||
    access_client_descriptor,
 | 
			
		||||
    allocate_client_descriptor,
 | 
			
		||||
    get_client_descriptor,
 | 
			
		||||
    maybe_enqueue_notifications,
 | 
			
		||||
    missedmessage_hook,
 | 
			
		||||
    persistent_queue_filename,
 | 
			
		||||
@@ -173,7 +173,7 @@ class MissedMessageNotificationsTest(ZulipTestCase):
 | 
			
		||||
            )
 | 
			
		||||
            self.assert_json_success(result)
 | 
			
		||||
            queue_id = orjson.loads(result.content)["queue_id"]
 | 
			
		||||
            return access_client_descriptor(user.id, queue_id)
 | 
			
		||||
            return get_client_descriptor(queue_id)
 | 
			
		||||
 | 
			
		||||
        def destroy_event_queue(user: UserProfile, queue_id: str) -> None:
 | 
			
		||||
            result = self.tornado_call(cleanup_event_queue, user, {"queue_id": queue_id})
 | 
			
		||||
 
 | 
			
		||||
@@ -36,7 +36,6 @@ from zerver.tornado.event_queue import (
 | 
			
		||||
    process_message_event,
 | 
			
		||||
    send_restart_events,
 | 
			
		||||
)
 | 
			
		||||
from zerver.tornado.exceptions import BadEventQueueIdError
 | 
			
		||||
from zerver.tornado.views import get_events, get_events_backend
 | 
			
		||||
from zerver.views.events_register import (
 | 
			
		||||
    _default_all_public_streams,
 | 
			
		||||
@@ -419,52 +418,6 @@ class GetEventsTest(ZulipTestCase):
 | 
			
		||||
        self.assertEqual(message["content"], "<p><strong>hello</strong></p>")
 | 
			
		||||
        self.assertEqual(message["avatar_url"], None)
 | 
			
		||||
 | 
			
		||||
    def test_bogus_queue_id(self) -> None:
 | 
			
		||||
        user = self.example_user("hamlet")
 | 
			
		||||
 | 
			
		||||
        with self.assertRaises(BadEventQueueIdError):
 | 
			
		||||
            self.tornado_call(
 | 
			
		||||
                get_events,
 | 
			
		||||
                user,
 | 
			
		||||
                {
 | 
			
		||||
                    "queue_id": "hamster",
 | 
			
		||||
                    "user_client": "website",
 | 
			
		||||
                    "last_event_id": -1,
 | 
			
		||||
                    "dont_block": orjson.dumps(True).decode(),
 | 
			
		||||
                },
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def test_wrong_user_queue_id(self) -> None:
 | 
			
		||||
        user = self.example_user("hamlet")
 | 
			
		||||
        wrong_user = self.example_user("othello")
 | 
			
		||||
 | 
			
		||||
        result = self.tornado_call(
 | 
			
		||||
            get_events,
 | 
			
		||||
            user,
 | 
			
		||||
            {
 | 
			
		||||
                "apply_markdown": orjson.dumps(True).decode(),
 | 
			
		||||
                "client_gravatar": orjson.dumps(True).decode(),
 | 
			
		||||
                "event_types": orjson.dumps(["message"]).decode(),
 | 
			
		||||
                "user_client": "website",
 | 
			
		||||
                "dont_block": orjson.dumps(True).decode(),
 | 
			
		||||
            },
 | 
			
		||||
        )
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
        queue_id = orjson.loads(result.content)["queue_id"]
 | 
			
		||||
 | 
			
		||||
        with self.assertLogs(level="WARNING") as cm, self.assertRaises(BadEventQueueIdError):
 | 
			
		||||
            self.tornado_call(
 | 
			
		||||
                get_events,
 | 
			
		||||
                wrong_user,
 | 
			
		||||
                {
 | 
			
		||||
                    "queue_id": queue_id,
 | 
			
		||||
                    "user_client": "website",
 | 
			
		||||
                    "last_event_id": -1,
 | 
			
		||||
                    "dont_block": orjson.dumps(True).decode(),
 | 
			
		||||
                },
 | 
			
		||||
            )
 | 
			
		||||
        self.assertIn("not authorized for queue", cm.output[0])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FetchInitialStateDataTest(ZulipTestCase):
 | 
			
		||||
    # Non-admin users don't have access to all bots
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,5 @@
 | 
			
		||||
from typing import TYPE_CHECKING, Dict, Optional
 | 
			
		||||
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from zerver.tornado.event_queue import ClientDescriptor
 | 
			
		||||
 | 
			
		||||
@@ -18,15 +16,3 @@ def set_descriptor_by_handler_id(handler_id: int, client_descriptor: "ClientDesc
 | 
			
		||||
 | 
			
		||||
def clear_descriptor_by_handler_id(handler_id: int) -> None:
 | 
			
		||||
    del descriptors_by_handler_id[handler_id]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
current_port: Optional[int] = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_current_port(port: int) -> Optional[int]:
 | 
			
		||||
    return settings.TEST_SUITE or current_port == port
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def set_current_port(port: int) -> None:
 | 
			
		||||
    global current_port
 | 
			
		||||
    current_port = port
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,3 @@
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from functools import lru_cache
 | 
			
		||||
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
@@ -12,13 +11,7 @@ from urllib3.util import Retry
 | 
			
		||||
 | 
			
		||||
from zerver.lib.queue import queue_json_publish
 | 
			
		||||
from zerver.models import Client, Realm, UserProfile
 | 
			
		||||
from zerver.tornado.sharding import (
 | 
			
		||||
    get_realm_tornado_ports,
 | 
			
		||||
    get_tornado_uri,
 | 
			
		||||
    get_user_id_tornado_port,
 | 
			
		||||
    get_user_tornado_port,
 | 
			
		||||
    notify_tornado_queue_name,
 | 
			
		||||
)
 | 
			
		||||
from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TornadoAdapter(HTTPAdapter):
 | 
			
		||||
@@ -87,7 +80,7 @@ def request_event_queue(
 | 
			
		||||
    if not settings.USING_TORNADO:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    tornado_uri = get_tornado_uri(get_user_tornado_port(user_profile))
 | 
			
		||||
    tornado_uri = get_tornado_uri(user_profile.realm)
 | 
			
		||||
    req = {
 | 
			
		||||
        "dont_block": "true",
 | 
			
		||||
        "apply_markdown": orjson.dumps(apply_markdown),
 | 
			
		||||
@@ -118,7 +111,7 @@ def get_user_events(
 | 
			
		||||
    if not settings.USING_TORNADO:
 | 
			
		||||
        return []
 | 
			
		||||
 | 
			
		||||
    tornado_uri = get_tornado_uri(get_user_tornado_port(user_profile))
 | 
			
		||||
    tornado_uri = get_tornado_uri(user_profile.realm)
 | 
			
		||||
    post_data: Dict[str, Any] = {
 | 
			
		||||
        "queue_id": queue_id,
 | 
			
		||||
        "last_event_id": last_event_id,
 | 
			
		||||
@@ -131,7 +124,7 @@ def get_user_events(
 | 
			
		||||
    return resp.json()["events"]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def send_notification_http(port: int, data: Mapping[str, Any]) -> None:
 | 
			
		||||
def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None:
 | 
			
		||||
    if not settings.USING_TORNADO or settings.RUNNING_INSIDE_TORNADO:
 | 
			
		||||
        # To allow the backend test suite to not require a separate
 | 
			
		||||
        # Tornado process, we simply call the process_notification
 | 
			
		||||
@@ -146,7 +139,7 @@ def send_notification_http(port: int, data: Mapping[str, Any]) -> None:
 | 
			
		||||
 | 
			
		||||
        process_notification(data)
 | 
			
		||||
    else:
 | 
			
		||||
        tornado_uri = get_tornado_uri(port)
 | 
			
		||||
        tornado_uri = get_tornado_uri(realm)
 | 
			
		||||
        requests_client().post(
 | 
			
		||||
            tornado_uri + "/notify_tornado",
 | 
			
		||||
            data=dict(data=orjson.dumps(data), secret=settings.SHARED_SECRET),
 | 
			
		||||
@@ -168,18 +161,9 @@ def send_event(
 | 
			
		||||
) -> None:
 | 
			
		||||
    """`users` is a list of user IDs, or in some special cases like message
 | 
			
		||||
    send/update or embeds, dictionaries containing extra data."""
 | 
			
		||||
    realm_ports = get_realm_tornado_ports(realm)
 | 
			
		||||
    if len(realm_ports) == 1:
 | 
			
		||||
        port_user_map = {realm_ports[0]: list(users)}
 | 
			
		||||
    else:
 | 
			
		||||
        port_user_map = defaultdict(list)
 | 
			
		||||
        for user in users:
 | 
			
		||||
            user_id = user if isinstance(user, int) else user["id"]
 | 
			
		||||
            port_user_map[get_user_id_tornado_port(realm_ports, user_id)].append(user)
 | 
			
		||||
 | 
			
		||||
    for port, port_users in port_user_map.items():
 | 
			
		||||
        queue_json_publish(
 | 
			
		||||
            notify_tornado_queue_name(port),
 | 
			
		||||
            dict(event=event, users=port_users),
 | 
			
		||||
            lambda *args, **kwargs: send_notification_http(port, *args, **kwargs),
 | 
			
		||||
        )
 | 
			
		||||
    port = get_tornado_port(realm)
 | 
			
		||||
    queue_json_publish(
 | 
			
		||||
        notify_tornado_queue_name(port),
 | 
			
		||||
        dict(event=event, users=list(users)),
 | 
			
		||||
        lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs),
 | 
			
		||||
    )
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,6 @@ import os
 | 
			
		||||
import random
 | 
			
		||||
import time
 | 
			
		||||
import traceback
 | 
			
		||||
import uuid
 | 
			
		||||
from collections import deque
 | 
			
		||||
from dataclasses import asdict
 | 
			
		||||
from typing import (
 | 
			
		||||
@@ -420,6 +419,8 @@ realm_clients_all_streams: Dict[int, List[ClientDescriptor]] = {}
 | 
			
		||||
# that is about to be deleted
 | 
			
		||||
gc_hooks: List[Callable[[int, ClientDescriptor, bool], None]] = []
 | 
			
		||||
 | 
			
		||||
next_queue_id = 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def clear_client_event_queues_for_testing() -> None:
 | 
			
		||||
    assert settings.TEST_SUITE
 | 
			
		||||
@@ -427,25 +428,19 @@ def clear_client_event_queues_for_testing() -> None:
 | 
			
		||||
    user_clients.clear()
 | 
			
		||||
    realm_clients_all_streams.clear()
 | 
			
		||||
    gc_hooks.clear()
 | 
			
		||||
    global next_queue_id
 | 
			
		||||
    next_queue_id = 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def add_client_gc_hook(hook: Callable[[int, ClientDescriptor, bool], None]) -> None:
 | 
			
		||||
    gc_hooks.append(hook)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def access_client_descriptor(user_id: int, queue_id: str) -> ClientDescriptor:
 | 
			
		||||
    client = clients.get(queue_id)
 | 
			
		||||
    if client is not None:
 | 
			
		||||
        if user_id == client.user_profile_id:
 | 
			
		||||
            return client
 | 
			
		||||
        logging.warning(
 | 
			
		||||
            "User %d is not authorized for queue %s (%d via %s)",
 | 
			
		||||
            user_id,
 | 
			
		||||
            queue_id,
 | 
			
		||||
            client.user_profile_id,
 | 
			
		||||
            client.current_client_name,
 | 
			
		||||
        )
 | 
			
		||||
    raise BadEventQueueIdError(queue_id)
 | 
			
		||||
def get_client_descriptor(queue_id: str) -> ClientDescriptor:
 | 
			
		||||
    try:
 | 
			
		||||
        return clients[queue_id]
 | 
			
		||||
    except KeyError:
 | 
			
		||||
        raise BadEventQueueIdError(queue_id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_client_descriptors_for_user(user_profile_id: int) -> List[ClientDescriptor]:
 | 
			
		||||
@@ -463,7 +458,9 @@ def add_to_client_dicts(client: ClientDescriptor) -> None:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def allocate_client_descriptor(new_queue_data: MutableMapping[str, Any]) -> ClientDescriptor:
 | 
			
		||||
    queue_id = str(uuid.uuid4())
 | 
			
		||||
    global next_queue_id
 | 
			
		||||
    queue_id = str(settings.SERVER_GENERATION) + ":" + str(next_queue_id)
 | 
			
		||||
    next_queue_id += 1
 | 
			
		||||
    new_queue_data["event_queue"] = EventQueue(queue_id).to_dict()
 | 
			
		||||
    client = ClientDescriptor.from_dict(new_queue_data)
 | 
			
		||||
    clients[queue_id] = client
 | 
			
		||||
@@ -643,7 +640,9 @@ def fetch_events(query: Mapping[str, Any]) -> Dict[str, Any]:
 | 
			
		||||
        else:
 | 
			
		||||
            if last_event_id is None:
 | 
			
		||||
                raise JsonableError(_("Missing 'last_event_id' argument"))
 | 
			
		||||
            client = access_client_descriptor(user_profile_id, queue_id)
 | 
			
		||||
            client = get_client_descriptor(queue_id)
 | 
			
		||||
            if user_profile_id != client.user_profile_id:
 | 
			
		||||
                raise JsonableError(_("You are not authorized to get events from this queue"))
 | 
			
		||||
            if (
 | 
			
		||||
                client.event_queue.newest_pruned_id is not None
 | 
			
		||||
                and last_event_id < client.event_queue.newest_pruned_id
 | 
			
		||||
@@ -1293,18 +1292,6 @@ def process_notification(notice: Mapping[str, Any]) -> None:
 | 
			
		||||
        process_deletion_event(event, user_ids)
 | 
			
		||||
    elif event["type"] == "presence":
 | 
			
		||||
        process_presence_event(event, cast(List[int], users))
 | 
			
		||||
    elif event["type"] == "cleanup_queue":
 | 
			
		||||
        # cleanup_event_queue may generate this event to forward cleanup
 | 
			
		||||
        # requests to the right shard.
 | 
			
		||||
        assert isinstance(users[0], int)
 | 
			
		||||
        try:
 | 
			
		||||
            client = access_client_descriptor(users[0], event["queue_id"])
 | 
			
		||||
        except BadEventQueueIdError:
 | 
			
		||||
            logging.info(
 | 
			
		||||
                "Ignoring cleanup request for bad queue id %s (%d)", event["queue_id"], users[0]
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            client.cleanup()
 | 
			
		||||
    else:
 | 
			
		||||
        process_event(event, cast(List[int], users))
 | 
			
		||||
    logging.debug(
 | 
			
		||||
 
 | 
			
		||||
@@ -1,47 +1,22 @@
 | 
			
		||||
import json
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
from typing import Dict, List, Pattern, Tuple, Union
 | 
			
		||||
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
 | 
			
		||||
from zerver.models import Realm, UserProfile
 | 
			
		||||
from zerver.models import Realm
 | 
			
		||||
 | 
			
		||||
shard_map: Dict[str, Union[int, List[int]]] = {}
 | 
			
		||||
shard_regexes: List[Tuple[Pattern[str], Union[int, List[int]]]] = []
 | 
			
		||||
shard_map = {}
 | 
			
		||||
if os.path.exists("/etc/zulip/sharding.json"):
 | 
			
		||||
    with open("/etc/zulip/sharding.json") as f:
 | 
			
		||||
        data = json.loads(f.read())
 | 
			
		||||
        shard_map = data.get(
 | 
			
		||||
            "shard_map",
 | 
			
		||||
            data,  # backwards compatibility
 | 
			
		||||
        )
 | 
			
		||||
        shard_regexes = [
 | 
			
		||||
            (re.compile(regex, re.I), port) for regex, port in data.get("shard_regexes", [])
 | 
			
		||||
        ]
 | 
			
		||||
        shard_map = json.loads(f.read())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_realm_tornado_ports(realm: Realm) -> List[int]:
 | 
			
		||||
    if realm.host in shard_map:
 | 
			
		||||
        ports = shard_map[realm.host]
 | 
			
		||||
        return [ports] if isinstance(ports, int) else ports
 | 
			
		||||
 | 
			
		||||
    for regex, ports in shard_regexes:
 | 
			
		||||
        if regex.match(realm.host):
 | 
			
		||||
            return [ports] if isinstance(ports, int) else ports
 | 
			
		||||
 | 
			
		||||
    return [settings.TORNADO_PORTS[0]]
 | 
			
		||||
def get_tornado_port(realm: Realm) -> int:
 | 
			
		||||
    return shard_map.get(realm.host, settings.TORNADO_PORTS[0])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_user_id_tornado_port(realm_ports: List[int], user_id: int) -> int:
 | 
			
		||||
    return realm_ports[user_id % len(realm_ports)]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_user_tornado_port(user: UserProfile) -> int:
 | 
			
		||||
    return get_user_id_tornado_port(get_realm_tornado_ports(user.realm), user.id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_tornado_uri(port: int) -> str:
 | 
			
		||||
def get_tornado_uri(realm: Realm) -> str:
 | 
			
		||||
    port = get_tornado_port(realm)
 | 
			
		||||
    return f"http://127.0.0.1:{port}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -3,13 +3,11 @@ from typing import Callable, Optional, Sequence, TypeVar
 | 
			
		||||
 | 
			
		||||
import orjson
 | 
			
		||||
from asgiref.sync import async_to_sync
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.http import HttpRequest, HttpResponse
 | 
			
		||||
from django.utils.translation import gettext as _
 | 
			
		||||
 | 
			
		||||
from zerver.decorator import internal_notify_view, process_client
 | 
			
		||||
from zerver.lib.exceptions import JsonableError
 | 
			
		||||
from zerver.lib.queue import get_queue_client
 | 
			
		||||
from zerver.lib.request import REQ, RequestNotes, has_request_variables
 | 
			
		||||
from zerver.lib.response import json_success
 | 
			
		||||
from zerver.lib.validator import (
 | 
			
		||||
@@ -20,9 +18,8 @@ from zerver.lib.validator import (
 | 
			
		||||
    to_non_negative_int,
 | 
			
		||||
)
 | 
			
		||||
from zerver.models import Client, UserProfile, get_client, get_user_profile_by_id
 | 
			
		||||
from zerver.tornado.descriptors import is_current_port
 | 
			
		||||
from zerver.tornado.event_queue import access_client_descriptor, fetch_events, process_notification
 | 
			
		||||
from zerver.tornado.sharding import get_user_tornado_port, notify_tornado_queue_name
 | 
			
		||||
from zerver.tornado.event_queue import fetch_events, get_client_descriptor, process_notification
 | 
			
		||||
from zerver.tornado.exceptions import BadEventQueueIdError
 | 
			
		||||
 | 
			
		||||
T = TypeVar("T")
 | 
			
		||||
 | 
			
		||||
@@ -44,28 +41,14 @@ def notify(request: HttpRequest) -> HttpResponse:
 | 
			
		||||
def cleanup_event_queue(
 | 
			
		||||
    request: HttpRequest, user_profile: UserProfile, queue_id: str = REQ()
 | 
			
		||||
) -> HttpResponse:
 | 
			
		||||
    client = get_client_descriptor(str(queue_id))
 | 
			
		||||
    if client is None:
 | 
			
		||||
        raise BadEventQueueIdError(queue_id)
 | 
			
		||||
    if user_profile.id != client.user_profile_id:
 | 
			
		||||
        raise JsonableError(_("You are not authorized to access this queue"))
 | 
			
		||||
    log_data = RequestNotes.get_notes(request).log_data
 | 
			
		||||
    assert log_data is not None
 | 
			
		||||
    log_data["extra"] = f"[{queue_id}]"
 | 
			
		||||
 | 
			
		||||
    user_port = get_user_tornado_port(user_profile)
 | 
			
		||||
    if not is_current_port(user_port):
 | 
			
		||||
        # X-Accel-Redirect is not supported for HTTP DELETE requests,
 | 
			
		||||
        # so we notify the shard hosting the acting user's queues via
 | 
			
		||||
        # enqueuing a special event.
 | 
			
		||||
        #
 | 
			
		||||
        # TODO: Because we return a 200 before confirming that the
 | 
			
		||||
        # event queue had been actually deleted by the process hosting
 | 
			
		||||
        # the queue, there's a race where a `GET /events` request can
 | 
			
		||||
        # succeed after getting a 200 from this endpoint.
 | 
			
		||||
        assert settings.USING_RABBITMQ
 | 
			
		||||
        get_queue_client().json_publish(
 | 
			
		||||
            notify_tornado_queue_name(user_port),
 | 
			
		||||
            {"users": [user_profile.id], "event": {"type": "cleanup_queue", "queue_id": queue_id}},
 | 
			
		||||
        )
 | 
			
		||||
        return json_success(request)
 | 
			
		||||
 | 
			
		||||
    client = access_client_descriptor(user_profile.id, queue_id)
 | 
			
		||||
    in_tornado_thread(client.cleanup)
 | 
			
		||||
    return json_success(request)
 | 
			
		||||
 | 
			
		||||
@@ -77,25 +60,11 @@ def get_events_internal(
 | 
			
		||||
) -> HttpResponse:
 | 
			
		||||
    user_profile = get_user_profile_by_id(user_profile_id)
 | 
			
		||||
    RequestNotes.get_notes(request).requestor_for_logs = user_profile.format_requestor_for_logs()
 | 
			
		||||
    assert is_current_port(get_user_tornado_port(user_profile))
 | 
			
		||||
 | 
			
		||||
    process_client(request, user_profile, client_name="internal")
 | 
			
		||||
    return get_events_backend(request, user_profile)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_events(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    user_port = get_user_tornado_port(user_profile)
 | 
			
		||||
    if not is_current_port(user_port):
 | 
			
		||||
        # When a single realm is split across multiple Tornado shards,
 | 
			
		||||
        # any `GET /events` requests that are routed to the wrong
 | 
			
		||||
        # shard are redirected to the shard hosting the relevant
 | 
			
		||||
        # user's queues. We use X-Accel-Redirect for this purpose,
 | 
			
		||||
        # which is efficient and keeps this redirect invisible to
 | 
			
		||||
        # clients.
 | 
			
		||||
        return HttpResponse(
 | 
			
		||||
            "", headers={"X-Accel-Redirect": f"/tornado/{user_port}{request.get_full_path()}"}
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    return get_events_backend(request, user_profile)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user