Compare commits

..

1 Commits

Author SHA1 Message Date
Anders Kaseorg
cb8ba561d9 migrations: Fix Python-looped SQL in 0376; don’t crash if no user found.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-12-06 12:13:43 -08:00
18 changed files with 90 additions and 281 deletions

View File

@@ -45,13 +45,6 @@ location /api/v1/events {
include /etc/nginx/zulip-include/proxy_longpolling; include /etc/nginx/zulip-include/proxy_longpolling;
} }
# Handle X-Accel-Redirect from Tornado to Tornado
location ~ ^/tornado/(\d+)(/.*)$ {
internal;
proxy_pass http://tornado$1$2$is_args$args;
include /etc/nginx/zulip-include/proxy_longpolling;
}
# Send everything else to Django via uWSGI # Send everything else to Django via uWSGI
location / { location / {
include uwsgi_params; include uwsgi_params;

View File

@@ -5,22 +5,15 @@ class zulip::tornado_sharding {
# with the correct default content for the "only one shard" setup. For this # with the correct default content for the "only one shard" setup. For this
# reason they use "replace => false", because the files are managed by # reason they use "replace => false", because the files are managed by
# the sharding script afterwards and Puppet shouldn't overwrite them. # the sharding script afterwards and Puppet shouldn't overwrite them.
file { '/etc/zulip/nginx_sharding_map.conf': file { '/etc/zulip/nginx_sharding.conf':
ensure => file, ensure => file,
owner => 'root', owner => 'root',
group => 'root', group => 'root',
mode => '0644', mode => '0644',
notify => Service['nginx'], notify => Service['nginx'],
content => @(EOT), content => "set \$tornado_server http://tornado;\n",
map "" $tornado_server {
default http://tornado;
}
| EOT
replace => false, replace => false,
} }
file { '/etc/zulip/nginx_sharding.conf':
ensure => absent,
}
file { '/etc/zulip/sharding.json': file { '/etc/zulip/sharding.json':
ensure => file, ensure => file,
require => User['zulip'], require => User['zulip'],
@@ -36,15 +29,14 @@ class zulip::tornado_sharding {
exec { 'stage_updated_sharding': exec { 'stage_updated_sharding':
command => "${::zulip_scripts_path}/lib/sharding.py", command => "${::zulip_scripts_path}/lib/sharding.py",
onlyif => "${::zulip_scripts_path}/lib/sharding.py --errors-ok", onlyif => "${::zulip_scripts_path}/lib/sharding.py --errors-ok",
require => [File['/etc/zulip/nginx_sharding_map.conf'], File['/etc/zulip/sharding.json']], require => [File['/etc/zulip/nginx_sharding.conf'], File['/etc/zulip/sharding.json']],
logoutput => true, logoutput => true,
loglevel => 'warning', loglevel => 'warning',
} }
# The ports of Tornado processes to run on the server, computed from # The ports of Tornado processes to run on the server; defaults to
# the zulip.conf configuration. Default is just port 9800. # 9800.
$tornado_groups = zulipconf_keys('tornado_sharding').map |$key| { $key.regsubst(/_regex$/, '').split('_') }.unique $tornado_ports = zulipconf_keys('tornado_sharding')
$tornado_ports = $tornado_groups.flatten.unique
file { '/etc/nginx/zulip-include/tornado-upstreams': file { '/etc/nginx/zulip-include/tornado-upstreams':
require => [Package[$zulip::common::nginx], Exec['stage_updated_sharding']], require => [Package[$zulip::common::nginx], Exec['stage_updated_sharding']],

View File

@@ -5,17 +5,6 @@ upstream tornado<%= port %> {
keepalive 10000; keepalive 10000;
} }
<% end -%> <% end -%>
<% @tornado_groups.each do |group| -%>
<% if group.length > 1 -%>
upstream tornado<%= group.join('_') %> {
random;
<% group.each do |port| -%>
server 127.0.0.1:<%= port %>;
<% end -%>
keepalive 10000;
}
<% end -%>
<% end -%>
<% else -%> <% else -%>
upstream tornado { upstream tornado {
server 127.0.0.1:9800; server 127.0.0.1:9800;

View File

@@ -13,7 +13,6 @@ server {
<% end -%> <% end -%>
include /etc/nginx/zulip-include/upstreams; include /etc/nginx/zulip-include/upstreams;
include /etc/zulip/nginx_sharding_map.conf;
server { server {
<% if @nginx_http_only -%> <% if @nginx_http_only -%>
@@ -31,6 +30,7 @@ server {
alias /home/zulip/local-static; alias /home/zulip/local-static;
} }
include /etc/zulip/nginx_sharding.conf;
include /etc/nginx/zulip-include/certbot; include /etc/nginx/zulip-include/certbot;
include /etc/nginx/zulip-include/app; include /etc/nginx/zulip-include/app;
} }

View File

@@ -1,5 +1,4 @@
include /etc/nginx/zulip-include/upstreams; include /etc/nginx/zulip-include/upstreams;
include /etc/zulip/nginx_sharding_map.conf;
server { server {
listen 443 ssl http2; listen 443 ssl http2;
@@ -16,5 +15,6 @@ server {
server_name zulipchat.com *.zulipchat.com; server_name zulipchat.com *.zulipchat.com;
include /etc/zulip/nginx_sharding.conf;
include /etc/nginx/zulip-include/app; include /etc/nginx/zulip-include/app;
} }

View File

@@ -6,7 +6,6 @@ server {
} }
include /etc/nginx/zulip-include/upstreams; include /etc/nginx/zulip-include/upstreams;
include /etc/zulip/nginx_sharding_map.conf;
server { server {
listen 443 ssl http2; listen 443 ssl http2;
@@ -23,5 +22,6 @@ server {
server_name zulipstaging.com; server_name zulipstaging.com;
include /etc/zulip/nginx_sharding.conf;
include /etc/nginx/zulip-include/app; include /etc/nginx/zulip-include/app;
} }

View File

@@ -5,7 +5,7 @@ import json
import os import os
import subprocess import subprocess
import sys import sys
from typing import Dict, List, Tuple, Union from typing import Any, Dict
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(BASE_DIR) sys.path.append(BASE_DIR)
@@ -16,13 +16,17 @@ setup_path()
from scripts.lib.zulip_tools import get_config_file, get_tornado_ports from scripts.lib.zulip_tools import get_config_file, get_tornado_ports
def nginx_quote(s: str) -> str: def write_realm_nginx_config_line(f: Any, host: str, port: str) -> None:
return '"' + s.replace("\\", "\\\\").replace('"', '\\"') + '"' f.write(
f"""if ($host = '{host}') {{
set $tornado_server http://tornado{port};
}}\n"""
)
# Basic system to do Tornado sharding. Writes two output .tmp files that need # Basic system to do Tornado sharding. Writes two output .tmp files that need
# to be renamed to the following files to finalize the changes: # to be renamed to the following files to finalize the changes:
# * /etc/zulip/nginx_sharding_map.conf; nginx needs to be reloaded after changing. # * /etc/zulip/nginx_sharding.conf; nginx needs to be reloaded after changing.
# * /etc/zulip/sharding.json; supervisor Django process needs to be reloaded # * /etc/zulip/sharding.json; supervisor Django process needs to be reloaded
# after changing. TODO: We can probably make this live-reload by statting the file. # after changing. TODO: We can probably make this live-reload by statting the file.
# #
@@ -31,52 +35,41 @@ def write_updated_configs() -> None:
config_file = get_config_file() config_file = get_config_file()
ports = get_tornado_ports(config_file) ports = get_tornado_ports(config_file)
expected_ports = list(range(9800, ports[-1] + 1)) expected_ports = list(range(9800, max(ports) + 1))
assert ports == expected_ports, f"ports ({ports}) must be contiguous, starting with 9800" assert (
sorted(ports) == expected_ports
), f"ports ({sorted(ports)}) must be contiguous, starting with 9800"
with open("/etc/zulip/nginx_sharding_map.conf.tmp", "w") as nginx_sharding_conf_f, open( with open("/etc/zulip/nginx_sharding.conf.tmp", "w") as nginx_sharding_conf_f, open(
"/etc/zulip/sharding.json.tmp", "w" "/etc/zulip/sharding.json.tmp", "w"
) as sharding_json_f: ) as sharding_json_f:
if len(ports) == 1: if len(ports) == 1:
nginx_sharding_conf_f.write('map "" $tornado_server {\n') nginx_sharding_conf_f.write("set $tornado_server http://tornado;\n")
nginx_sharding_conf_f.write(" default http://tornado;\n")
nginx_sharding_conf_f.write("}\n")
sharding_json_f.write("{}\n") sharding_json_f.write("{}\n")
return return
nginx_sharding_conf_f.write("map $http_host $tornado_server {\n") nginx_sharding_conf_f.write("set $tornado_server http://tornado9800;\n")
nginx_sharding_conf_f.write(" default http://tornado9800;\n") shard_map: Dict[str, int] = {}
shard_map: Dict[str, Union[int, List[int]]] = {}
shard_regexes: List[Tuple[str, Union[int, List[int]]]] = []
external_host = subprocess.check_output( external_host = subprocess.check_output(
[os.path.join(BASE_DIR, "scripts/get-django-setting"), "EXTERNAL_HOST"], [os.path.join(BASE_DIR, "scripts/get-django-setting"), "EXTERNAL_HOST"],
text=True, text=True,
).strip() ).strip()
for key, shards in config_file["tornado_sharding"].items(): for port in config_file["tornado_sharding"]:
if key.endswith("_regex"): shards = config_file["tornado_sharding"][port].strip()
ports = [int(port) for port in key[: -len("_regex")].split("_")]
shard_regexes.append((shards, ports[0] if len(ports) == 1 else ports)) if shards:
nginx_sharding_conf_f.write( for shard in shards.split(" "):
f" {nginx_quote('~*' + shards)} http://tornado{'_'.join(map(str, ports))};\n"
)
else:
ports = [int(port) for port in key.split("_")]
for shard in shards.split():
if "." in shard: if "." in shard:
host = shard host = shard
else: else:
host = f"{shard}.{external_host}" host = f"{shard}.{external_host}"
assert host not in shard_map, f"host {host} duplicated" assert host not in shard_map, f"host {host} duplicated"
shard_map[host] = ports[0] if len(ports) == 1 else ports shard_map[host] = int(port)
nginx_sharding_conf_f.write( write_realm_nginx_config_line(nginx_sharding_conf_f, host, port)
f" {nginx_quote(host)} http://tornado{'_'.join(map(str, ports))};\n"
)
nginx_sharding_conf_f.write("\n") nginx_sharding_conf_f.write("\n")
nginx_sharding_conf_f.write("}\n")
data = {"shard_map": shard_map, "shard_regexes": shard_regexes} sharding_json_f.write(json.dumps(shard_map) + "\n")
sharding_json_f.write(json.dumps(data) + "\n")
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -90,7 +83,7 @@ parser.add_argument(
options = parser.parse_args() options = parser.parse_args()
config_file_path = "/etc/zulip" config_file_path = "/etc/zulip"
base_files = ["nginx_sharding_map.conf", "sharding.json"] base_files = ["nginx_sharding.conf", "sharding.json"]
full_real_paths = [f"{config_file_path}/{filename}" for filename in base_files] full_real_paths = [f"{config_file_path}/{filename}" for filename in base_files]
full_new_paths = [f"{filename}.tmp" for filename in full_real_paths] full_new_paths = [f"{filename}.tmp" for filename in full_real_paths]
try: try:

View File

@@ -595,13 +595,7 @@ def run_psql_as_postgres(
def get_tornado_ports(config_file: configparser.RawConfigParser) -> List[int]: def get_tornado_ports(config_file: configparser.RawConfigParser) -> List[int]:
ports = [] ports = []
if config_file.has_section("tornado_sharding"): if config_file.has_section("tornado_sharding"):
ports = sorted( ports = [int(port) for port in config_file.options("tornado_sharding")]
{
int(port)
for key in config_file.options("tornado_sharding")
for port in (key[: -len("_regex")] if key.endswith("_regex") else key).split("_")
}
)
if not ports: if not ports:
ports = [9800] ports = [9800]
return ports return ports

View File

@@ -7,16 +7,16 @@ set -e
SUPPRESS_SHARDING_NOTICE=1 "$(dirname "$0")/zulip-puppet-apply" -f SUPPRESS_SHARDING_NOTICE=1 "$(dirname "$0")/zulip-puppet-apply" -f
# Verify, before we move them into place # Verify, before we move them into place
if ! [ -e /etc/zulip/nginx_sharding_map.conf.tmp ] || ! [ -e /etc/zulip/sharding.json.tmp ]; then if ! [ -e /etc/zulip/nginx_sharding.conf.tmp ] || ! [ -e /etc/zulip/sharding.json.tmp ]; then
echo "No sharding updates found to apply." echo "No sharding updates found to apply."
exit 1 exit 1
fi fi
chown root:root /etc/zulip/nginx_sharding_map.conf.tmp chown root:root /etc/zulip/nginx_sharding.conf.tmp
chmod 644 /etc/zulip/nginx_sharding_map.conf.tmp chmod 644 /etc/zulip/nginx_sharding.conf.tmp
chown zulip:zulip /etc/zulip/sharding.json.tmp chown zulip:zulip /etc/zulip/sharding.json.tmp
chmod 644 /etc/zulip/sharding.json.tmp chmod 644 /etc/zulip/sharding.json.tmp
mv /etc/zulip/nginx_sharding_map.conf.tmp /etc/zulip/nginx_sharding_map.conf mv /etc/zulip/nginx_sharding.conf.tmp /etc/zulip/nginx_sharding.conf
mv /etc/zulip/sharding.json.tmp /etc/zulip/sharding.json mv /etc/zulip/sharding.json.tmp /etc/zulip/sharding.json
# In the ordering of operations below, the crucial detail is that # In the ordering of operations below, the crucial detail is that

View File

@@ -19,7 +19,6 @@ if settings.PRODUCTION:
from zerver.lib.async_utils import NoAutoCreateEventLoopPolicy from zerver.lib.async_utils import NoAutoCreateEventLoopPolicy
from zerver.lib.debug import interactive_debug_listen from zerver.lib.debug import interactive_debug_listen
from zerver.tornado.application import create_tornado_application, setup_tornado_rabbitmq from zerver.tornado.application import create_tornado_application, setup_tornado_rabbitmq
from zerver.tornado.descriptors import set_current_port
from zerver.tornado.event_queue import ( from zerver.tornado.event_queue import (
add_client_gc_hook, add_client_gc_hook,
dump_event_queues, dump_event_queues,
@@ -92,7 +91,6 @@ class Command(BaseCommand):
) )
await sync_to_async(add_signal_handlers, thread_sensitive=True)() await sync_to_async(add_signal_handlers, thread_sensitive=True)()
set_current_port(port)
translation.activate(settings.LANGUAGE_CODE) translation.activate(settings.LANGUAGE_CODE)
# We pass display_num_errors=False, since Django will # We pass display_num_errors=False, since Django will

View File

@@ -1,6 +1,7 @@
from django.db import migrations from django.db import migrations
from django.db.backends.postgresql.schema import DatabaseSchemaEditor from django.db.backends.postgresql.schema import DatabaseSchemaEditor
from django.db.migrations.state import StateApps from django.db.migrations.state import StateApps
from django.db.models import OuterRef, Subquery
def set_emoji_author(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None: def set_emoji_author(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None:
@@ -13,20 +14,15 @@ def set_emoji_author(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> No
UserProfile = apps.get_model("zerver", "UserProfile") UserProfile = apps.get_model("zerver", "UserProfile")
ROLE_REALM_OWNER = 100 ROLE_REALM_OWNER = 100
realm_emoji_to_update = [] RealmEmoji.objects.filter(author=None).update(
for realm_emoji in RealmEmoji.objects.all(): author=Subquery(
if realm_emoji.author_id is None:
user_profile = (
UserProfile.objects.filter( UserProfile.objects.filter(
realm_id=realm_emoji.realm_id, is_active=True, role=ROLE_REALM_OWNER realm=OuterRef("realm"), is_active=True, role=ROLE_REALM_OWNER
)
.order_by("id")[:1]
.values("pk")
) )
.order_by("id")
.first()
) )
realm_emoji.author_id = user_profile.id
realm_emoji_to_update.append(realm_emoji)
RealmEmoji.objects.bulk_update(realm_emoji_to_update, ["author_id"])
# Previously, this also pushed `reupload_realm_emoji` events onto # Previously, this also pushed `reupload_realm_emoji` events onto
# the `deferred_work` queue; however, # the `deferred_work` queue; however,

View File

@@ -13,8 +13,8 @@ from zerver.lib.user_groups import create_user_group, remove_user_from_user_grou
from zerver.models import Recipient, Stream, Subscription, UserProfile, get_stream from zerver.models import Recipient, Stream, Subscription, UserProfile, get_stream
from zerver.tornado.event_queue import ( from zerver.tornado.event_queue import (
ClientDescriptor, ClientDescriptor,
access_client_descriptor,
allocate_client_descriptor, allocate_client_descriptor,
get_client_descriptor,
maybe_enqueue_notifications, maybe_enqueue_notifications,
missedmessage_hook, missedmessage_hook,
persistent_queue_filename, persistent_queue_filename,
@@ -173,7 +173,7 @@ class MissedMessageNotificationsTest(ZulipTestCase):
) )
self.assert_json_success(result) self.assert_json_success(result)
queue_id = orjson.loads(result.content)["queue_id"] queue_id = orjson.loads(result.content)["queue_id"]
return access_client_descriptor(user.id, queue_id) return get_client_descriptor(queue_id)
def destroy_event_queue(user: UserProfile, queue_id: str) -> None: def destroy_event_queue(user: UserProfile, queue_id: str) -> None:
result = self.tornado_call(cleanup_event_queue, user, {"queue_id": queue_id}) result = self.tornado_call(cleanup_event_queue, user, {"queue_id": queue_id})

View File

@@ -36,7 +36,6 @@ from zerver.tornado.event_queue import (
process_message_event, process_message_event,
send_restart_events, send_restart_events,
) )
from zerver.tornado.exceptions import BadEventQueueIdError
from zerver.tornado.views import get_events, get_events_backend from zerver.tornado.views import get_events, get_events_backend
from zerver.views.events_register import ( from zerver.views.events_register import (
_default_all_public_streams, _default_all_public_streams,
@@ -419,52 +418,6 @@ class GetEventsTest(ZulipTestCase):
self.assertEqual(message["content"], "<p><strong>hello</strong></p>") self.assertEqual(message["content"], "<p><strong>hello</strong></p>")
self.assertEqual(message["avatar_url"], None) self.assertEqual(message["avatar_url"], None)
def test_bogus_queue_id(self) -> None:
user = self.example_user("hamlet")
with self.assertRaises(BadEventQueueIdError):
self.tornado_call(
get_events,
user,
{
"queue_id": "hamster",
"user_client": "website",
"last_event_id": -1,
"dont_block": orjson.dumps(True).decode(),
},
)
def test_wrong_user_queue_id(self) -> None:
user = self.example_user("hamlet")
wrong_user = self.example_user("othello")
result = self.tornado_call(
get_events,
user,
{
"apply_markdown": orjson.dumps(True).decode(),
"client_gravatar": orjson.dumps(True).decode(),
"event_types": orjson.dumps(["message"]).decode(),
"user_client": "website",
"dont_block": orjson.dumps(True).decode(),
},
)
self.assert_json_success(result)
queue_id = orjson.loads(result.content)["queue_id"]
with self.assertLogs(level="WARNING") as cm, self.assertRaises(BadEventQueueIdError):
self.tornado_call(
get_events,
wrong_user,
{
"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": orjson.dumps(True).decode(),
},
)
self.assertIn("not authorized for queue", cm.output[0])
class FetchInitialStateDataTest(ZulipTestCase): class FetchInitialStateDataTest(ZulipTestCase):
# Non-admin users don't have access to all bots # Non-admin users don't have access to all bots

View File

@@ -1,7 +1,5 @@
from typing import TYPE_CHECKING, Dict, Optional from typing import TYPE_CHECKING, Dict, Optional
from django.conf import settings
if TYPE_CHECKING: if TYPE_CHECKING:
from zerver.tornado.event_queue import ClientDescriptor from zerver.tornado.event_queue import ClientDescriptor
@@ -18,15 +16,3 @@ def set_descriptor_by_handler_id(handler_id: int, client_descriptor: "ClientDesc
def clear_descriptor_by_handler_id(handler_id: int) -> None: def clear_descriptor_by_handler_id(handler_id: int) -> None:
del descriptors_by_handler_id[handler_id] del descriptors_by_handler_id[handler_id]
current_port: Optional[int] = None
def is_current_port(port: int) -> Optional[int]:
return settings.TEST_SUITE or current_port == port
def set_current_port(port: int) -> None:
global current_port
current_port = port

View File

@@ -1,4 +1,3 @@
from collections import defaultdict
from functools import lru_cache from functools import lru_cache
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -12,13 +11,7 @@ from urllib3.util import Retry
from zerver.lib.queue import queue_json_publish from zerver.lib.queue import queue_json_publish
from zerver.models import Client, Realm, UserProfile from zerver.models import Client, Realm, UserProfile
from zerver.tornado.sharding import ( from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name
get_realm_tornado_ports,
get_tornado_uri,
get_user_id_tornado_port,
get_user_tornado_port,
notify_tornado_queue_name,
)
class TornadoAdapter(HTTPAdapter): class TornadoAdapter(HTTPAdapter):
@@ -87,7 +80,7 @@ def request_event_queue(
if not settings.USING_TORNADO: if not settings.USING_TORNADO:
return None return None
tornado_uri = get_tornado_uri(get_user_tornado_port(user_profile)) tornado_uri = get_tornado_uri(user_profile.realm)
req = { req = {
"dont_block": "true", "dont_block": "true",
"apply_markdown": orjson.dumps(apply_markdown), "apply_markdown": orjson.dumps(apply_markdown),
@@ -118,7 +111,7 @@ def get_user_events(
if not settings.USING_TORNADO: if not settings.USING_TORNADO:
return [] return []
tornado_uri = get_tornado_uri(get_user_tornado_port(user_profile)) tornado_uri = get_tornado_uri(user_profile.realm)
post_data: Dict[str, Any] = { post_data: Dict[str, Any] = {
"queue_id": queue_id, "queue_id": queue_id,
"last_event_id": last_event_id, "last_event_id": last_event_id,
@@ -131,7 +124,7 @@ def get_user_events(
return resp.json()["events"] return resp.json()["events"]
def send_notification_http(port: int, data: Mapping[str, Any]) -> None: def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None:
if not settings.USING_TORNADO or settings.RUNNING_INSIDE_TORNADO: if not settings.USING_TORNADO or settings.RUNNING_INSIDE_TORNADO:
# To allow the backend test suite to not require a separate # To allow the backend test suite to not require a separate
# Tornado process, we simply call the process_notification # Tornado process, we simply call the process_notification
@@ -146,7 +139,7 @@ def send_notification_http(port: int, data: Mapping[str, Any]) -> None:
process_notification(data) process_notification(data)
else: else:
tornado_uri = get_tornado_uri(port) tornado_uri = get_tornado_uri(realm)
requests_client().post( requests_client().post(
tornado_uri + "/notify_tornado", tornado_uri + "/notify_tornado",
data=dict(data=orjson.dumps(data), secret=settings.SHARED_SECRET), data=dict(data=orjson.dumps(data), secret=settings.SHARED_SECRET),
@@ -168,18 +161,9 @@ def send_event(
) -> None: ) -> None:
"""`users` is a list of user IDs, or in some special cases like message """`users` is a list of user IDs, or in some special cases like message
send/update or embeds, dictionaries containing extra data.""" send/update or embeds, dictionaries containing extra data."""
realm_ports = get_realm_tornado_ports(realm) port = get_tornado_port(realm)
if len(realm_ports) == 1:
port_user_map = {realm_ports[0]: list(users)}
else:
port_user_map = defaultdict(list)
for user in users:
user_id = user if isinstance(user, int) else user["id"]
port_user_map[get_user_id_tornado_port(realm_ports, user_id)].append(user)
for port, port_users in port_user_map.items():
queue_json_publish( queue_json_publish(
notify_tornado_queue_name(port), notify_tornado_queue_name(port),
dict(event=event, users=port_users), dict(event=event, users=list(users)),
lambda *args, **kwargs: send_notification_http(port, *args, **kwargs), lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs),
) )

View File

@@ -6,7 +6,6 @@ import os
import random import random
import time import time
import traceback import traceback
import uuid
from collections import deque from collections import deque
from dataclasses import asdict from dataclasses import asdict
from typing import ( from typing import (
@@ -420,6 +419,8 @@ realm_clients_all_streams: Dict[int, List[ClientDescriptor]] = {}
# that is about to be deleted # that is about to be deleted
gc_hooks: List[Callable[[int, ClientDescriptor, bool], None]] = [] gc_hooks: List[Callable[[int, ClientDescriptor, bool], None]] = []
next_queue_id = 0
def clear_client_event_queues_for_testing() -> None: def clear_client_event_queues_for_testing() -> None:
assert settings.TEST_SUITE assert settings.TEST_SUITE
@@ -427,24 +428,18 @@ def clear_client_event_queues_for_testing() -> None:
user_clients.clear() user_clients.clear()
realm_clients_all_streams.clear() realm_clients_all_streams.clear()
gc_hooks.clear() gc_hooks.clear()
global next_queue_id
next_queue_id = 0
def add_client_gc_hook(hook: Callable[[int, ClientDescriptor, bool], None]) -> None: def add_client_gc_hook(hook: Callable[[int, ClientDescriptor, bool], None]) -> None:
gc_hooks.append(hook) gc_hooks.append(hook)
def access_client_descriptor(user_id: int, queue_id: str) -> ClientDescriptor: def get_client_descriptor(queue_id: str) -> ClientDescriptor:
client = clients.get(queue_id) try:
if client is not None: return clients[queue_id]
if user_id == client.user_profile_id: except KeyError:
return client
logging.warning(
"User %d is not authorized for queue %s (%d via %s)",
user_id,
queue_id,
client.user_profile_id,
client.current_client_name,
)
raise BadEventQueueIdError(queue_id) raise BadEventQueueIdError(queue_id)
@@ -463,7 +458,9 @@ def add_to_client_dicts(client: ClientDescriptor) -> None:
def allocate_client_descriptor(new_queue_data: MutableMapping[str, Any]) -> ClientDescriptor: def allocate_client_descriptor(new_queue_data: MutableMapping[str, Any]) -> ClientDescriptor:
queue_id = str(uuid.uuid4()) global next_queue_id
queue_id = str(settings.SERVER_GENERATION) + ":" + str(next_queue_id)
next_queue_id += 1
new_queue_data["event_queue"] = EventQueue(queue_id).to_dict() new_queue_data["event_queue"] = EventQueue(queue_id).to_dict()
client = ClientDescriptor.from_dict(new_queue_data) client = ClientDescriptor.from_dict(new_queue_data)
clients[queue_id] = client clients[queue_id] = client
@@ -643,7 +640,9 @@ def fetch_events(query: Mapping[str, Any]) -> Dict[str, Any]:
else: else:
if last_event_id is None: if last_event_id is None:
raise JsonableError(_("Missing 'last_event_id' argument")) raise JsonableError(_("Missing 'last_event_id' argument"))
client = access_client_descriptor(user_profile_id, queue_id) client = get_client_descriptor(queue_id)
if user_profile_id != client.user_profile_id:
raise JsonableError(_("You are not authorized to get events from this queue"))
if ( if (
client.event_queue.newest_pruned_id is not None client.event_queue.newest_pruned_id is not None
and last_event_id < client.event_queue.newest_pruned_id and last_event_id < client.event_queue.newest_pruned_id
@@ -1293,18 +1292,6 @@ def process_notification(notice: Mapping[str, Any]) -> None:
process_deletion_event(event, user_ids) process_deletion_event(event, user_ids)
elif event["type"] == "presence": elif event["type"] == "presence":
process_presence_event(event, cast(List[int], users)) process_presence_event(event, cast(List[int], users))
elif event["type"] == "cleanup_queue":
# cleanup_event_queue may generate this event to forward cleanup
# requests to the right shard.
assert isinstance(users[0], int)
try:
client = access_client_descriptor(users[0], event["queue_id"])
except BadEventQueueIdError:
logging.info(
"Ignoring cleanup request for bad queue id %s (%d)", event["queue_id"], users[0]
)
else:
client.cleanup()
else: else:
process_event(event, cast(List[int], users)) process_event(event, cast(List[int], users))
logging.debug( logging.debug(

View File

@@ -1,47 +1,22 @@
import json import json
import os import os
import re
from typing import Dict, List, Pattern, Tuple, Union
from django.conf import settings from django.conf import settings
from zerver.models import Realm, UserProfile from zerver.models import Realm
shard_map: Dict[str, Union[int, List[int]]] = {} shard_map = {}
shard_regexes: List[Tuple[Pattern[str], Union[int, List[int]]]] = []
if os.path.exists("/etc/zulip/sharding.json"): if os.path.exists("/etc/zulip/sharding.json"):
with open("/etc/zulip/sharding.json") as f: with open("/etc/zulip/sharding.json") as f:
data = json.loads(f.read()) shard_map = json.loads(f.read())
shard_map = data.get(
"shard_map",
data, # backwards compatibility
)
shard_regexes = [
(re.compile(regex, re.I), port) for regex, port in data.get("shard_regexes", [])
]
def get_realm_tornado_ports(realm: Realm) -> List[int]: def get_tornado_port(realm: Realm) -> int:
if realm.host in shard_map: return shard_map.get(realm.host, settings.TORNADO_PORTS[0])
ports = shard_map[realm.host]
return [ports] if isinstance(ports, int) else ports
for regex, ports in shard_regexes:
if regex.match(realm.host):
return [ports] if isinstance(ports, int) else ports
return [settings.TORNADO_PORTS[0]]
def get_user_id_tornado_port(realm_ports: List[int], user_id: int) -> int: def get_tornado_uri(realm: Realm) -> str:
return realm_ports[user_id % len(realm_ports)] port = get_tornado_port(realm)
def get_user_tornado_port(user: UserProfile) -> int:
return get_user_id_tornado_port(get_realm_tornado_ports(user.realm), user.id)
def get_tornado_uri(port: int) -> str:
return f"http://127.0.0.1:{port}" return f"http://127.0.0.1:{port}"

View File

@@ -3,13 +3,11 @@ from typing import Callable, Optional, Sequence, TypeVar
import orjson import orjson
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync
from django.conf import settings
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from zerver.decorator import internal_notify_view, process_client from zerver.decorator import internal_notify_view, process_client
from zerver.lib.exceptions import JsonableError from zerver.lib.exceptions import JsonableError
from zerver.lib.queue import get_queue_client
from zerver.lib.request import REQ, RequestNotes, has_request_variables from zerver.lib.request import REQ, RequestNotes, has_request_variables
from zerver.lib.response import json_success from zerver.lib.response import json_success
from zerver.lib.validator import ( from zerver.lib.validator import (
@@ -20,9 +18,8 @@ from zerver.lib.validator import (
to_non_negative_int, to_non_negative_int,
) )
from zerver.models import Client, UserProfile, get_client, get_user_profile_by_id from zerver.models import Client, UserProfile, get_client, get_user_profile_by_id
from zerver.tornado.descriptors import is_current_port from zerver.tornado.event_queue import fetch_events, get_client_descriptor, process_notification
from zerver.tornado.event_queue import access_client_descriptor, fetch_events, process_notification from zerver.tornado.exceptions import BadEventQueueIdError
from zerver.tornado.sharding import get_user_tornado_port, notify_tornado_queue_name
T = TypeVar("T") T = TypeVar("T")
@@ -44,28 +41,14 @@ def notify(request: HttpRequest) -> HttpResponse:
def cleanup_event_queue( def cleanup_event_queue(
request: HttpRequest, user_profile: UserProfile, queue_id: str = REQ() request: HttpRequest, user_profile: UserProfile, queue_id: str = REQ()
) -> HttpResponse: ) -> HttpResponse:
client = get_client_descriptor(str(queue_id))
if client is None:
raise BadEventQueueIdError(queue_id)
if user_profile.id != client.user_profile_id:
raise JsonableError(_("You are not authorized to access this queue"))
log_data = RequestNotes.get_notes(request).log_data log_data = RequestNotes.get_notes(request).log_data
assert log_data is not None assert log_data is not None
log_data["extra"] = f"[{queue_id}]" log_data["extra"] = f"[{queue_id}]"
user_port = get_user_tornado_port(user_profile)
if not is_current_port(user_port):
# X-Accel-Redirect is not supported for HTTP DELETE requests,
# so we notify the shard hosting the acting user's queues via
# enqueuing a special event.
#
# TODO: Because we return a 200 before confirming that the
# event queue had been actually deleted by the process hosting
# the queue, there's a race where a `GET /events` request can
# succeed after getting a 200 from this endpoint.
assert settings.USING_RABBITMQ
get_queue_client().json_publish(
notify_tornado_queue_name(user_port),
{"users": [user_profile.id], "event": {"type": "cleanup_queue", "queue_id": queue_id}},
)
return json_success(request)
client = access_client_descriptor(user_profile.id, queue_id)
in_tornado_thread(client.cleanup) in_tornado_thread(client.cleanup)
return json_success(request) return json_success(request)
@@ -77,25 +60,11 @@ def get_events_internal(
) -> HttpResponse: ) -> HttpResponse:
user_profile = get_user_profile_by_id(user_profile_id) user_profile = get_user_profile_by_id(user_profile_id)
RequestNotes.get_notes(request).requestor_for_logs = user_profile.format_requestor_for_logs() RequestNotes.get_notes(request).requestor_for_logs = user_profile.format_requestor_for_logs()
assert is_current_port(get_user_tornado_port(user_profile))
process_client(request, user_profile, client_name="internal") process_client(request, user_profile, client_name="internal")
return get_events_backend(request, user_profile) return get_events_backend(request, user_profile)
def get_events(request: HttpRequest, user_profile: UserProfile) -> HttpResponse: def get_events(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
user_port = get_user_tornado_port(user_profile)
if not is_current_port(user_port):
# When a single realm is split across multiple Tornado shards,
# any `GET /events` requests that are routed to the wrong
# shard are redirected to the shard hosting the relevant
# user's queues. We use X-Accel-Redirect for this purpose,
# which is efficient and keeps this redirect invisible to
# clients.
return HttpResponse(
"", headers={"X-Accel-Redirect": f"/tornado/{user_port}{request.get_full_path()}"}
)
return get_events_backend(request, user_profile) return get_events_backend(request, user_profile)