dependencies: Remove WebSockets system for sending messages.

Zulip has had a small use of WebSockets (specifically, for the code
path of sending messages, via the webapp only) since ~2013.  We
originally added this use of WebSockets in the hope that the latency
benefits of doing so would allow us to avoid implementing a markdown
local echo; they were not.  Further, HTTP/2 may have eliminated the
latency difference we hoped to exploit by using WebSockets in any
case.

While we’d originally imagined using WebSockets for other endpoints,
there was never a good justification for moving more components to the
WebSockets system.

This WebSockets code path had a lot of downsides/complexity,
including:

* The messy hack involving constructing an emulated request object to
  hook into doing Django requests.
* The `message_senders` queue processor system, which increases RAM
  needs and must be provisioned independently from the rest of the
  server).
* A duplicate check_send_receive_time Nagios test specific to
  WebSockets.
* The requirement for users to have their firewalls/NATs allow
  WebSocket connections, and a setting to disable them for networks
  where WebSockets don’t work.
* Dependencies on the SockJS family of libraries, which has at times
  been poorly maintained, and periodically throws random JavaScript
  exceptions in our production environments without a deep enough
  traceback to effectively investigate.
* A total of about 1600 lines of our code related to the feature.
* Increased load on the Tornado system, especially around a Zulip
  server restart, and especially for large installations like
  zulipchat.com, resulting in extra delay before messages can be sent
  again.

As detailed in
https://github.com/zulip/zulip/pull/12862#issuecomment-536152397, it
appears that removing WebSockets moderately increases the time it
takes for the `send_message` API query to return from the server, but
does not significantly change the time between when a message is sent
and when it is received by clients.  We don’t understand the reason
for that change (suggesting the possibility of a measurement error),
and even if it is a real change, we consider that potential small
latency regression to be acceptable.

If we later want WebSockets, we’ll likely want to just use Django
Channels.

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
This commit is contained in:
Anders Kaseorg
2019-07-22 16:43:40 -07:00
committed by Tim Abbott
parent 6fc2a317e9
commit ea6934c26d
50 changed files with 48 additions and 4081 deletions

View File

@@ -19,8 +19,6 @@
"MessageListData": false,
"MessageListView": false,
"Plotly": false,
"SockJS": false,
"Socket": false,
"Sortable": false,
"WinChan": false,
"XDate": false,

View File

@@ -120,11 +120,6 @@ Files: static/third/marked/*
Copyright: 2011-2013, Christopher Jeffrey
License: Expat
Files: static/third/sockjs/sockjs-0.3.4.js
Copyright: 2011-2012 VMware, Inc.
2012 Douglas Crockford
License: Expat and public-domain
Files: zerver/lib/bugdown/fenced_code.py
Copyright: 2006-2008 Waylan Limberg
License: BSD-3-Clause

View File

@@ -839,7 +839,6 @@ warning fsevents@1.1.1: The platform "linux" is incompatible with this module.
info "fsevents@1.1.1" is an optional dependency and failed compatibility check. Excluding it from installation.
[3/4] Linking dependencies...
[4/4] Building fresh packages...
$ browserify node_modules/sockjs-client/lib/entry.js --standalone SockJS > node_modules/sockjs-client/sockjs.js
Done in 23.50s.
```

View File

@@ -79,7 +79,7 @@ processes; see "Supervisor" below) and the nginx configuration (which
explains which HTTP requests get sent to which app server).
Tornado is an asynchronous server and is meant specifically to hold
open tens of thousands of long-lived (long-polling or websocket)
open tens of thousands of long-lived (long-polling)
connections -- that is to say, routes that maintain a persistent
connection from every running client. For this reason, it's
responsible for event (message) delivery, but not much else. We try to
@@ -94,9 +94,7 @@ use in most of our codebase using don't support that, and in any case,
our architecture doesn't require Tornado to do that).
The parts that are activated relatively rarely (e.g. when people type or
click on something) are processed by the Django application server. One
exception to this is that Zulip uses websockets through Tornado to
minimize latency on the code path for **sending** messages.
click on something) are processed by the Django application server.
There is detailed documentation on the
[real-time push and event queue system](../subsystems/events-system.md); most of
@@ -137,7 +135,7 @@ from outside.
compiles, minifies, and installs the static assets into the
`prod-static/` tree form. In development, files are served directly
from `/static/` in the git repository.
- Requests to `/json/events`, `/api/v1/events`, and `/sockjs` are
- Requests to `/json/events` and `/api/v1/events` are
sent to the Tornado server. These are requests to the real-time push
system, because the user's web browser sets up a long-lived TCP
connection with Tornado to serve as [a channel for push

View File

@@ -46,11 +46,6 @@ When everything is running as expected, you will see something like this:
```
process-fts-updates RUNNING pid 2194, uptime 1:13:11
zulip-django RUNNING pid 2192, uptime 1:13:11
zulip-senders:zulip-events-message_sender-0 RUNNING pid 2209, uptime 1:13:11
zulip-senders:zulip-events-message_sender-1 RUNNING pid 2210, uptime 1:13:11
zulip-senders:zulip-events-message_sender-2 RUNNING pid 2211, uptime 1:13:11
zulip-senders:zulip-events-message_sender-3 RUNNING pid 2212, uptime 1:13:11
zulip-senders:zulip-events-message_sender-4 RUNNING pid 2208, uptime 1:13:11
zulip-tornado RUNNING pid 2193, uptime 1:13:11
zulip-workers:zulip-events-confirmation-emails RUNNING pid 2199, uptime 1:13:11
zulip-workers:zulip-events-digest_emails RUNNING pid 2205, uptime 1:13:11

View File

@@ -23,9 +23,6 @@ used for a variety of purposes:
* Processing various errors, frontend tracebacks, and slow database
queries in a batched fashion.
* Doing markdown rendering for messages delivered to the Tornado via
websockets.
Needless to say, the RabbitMQ-based queuing system is an important
part of the overall Zulip architecture, since it's in critical code
paths for everything from signing up for account, to rendering

View File

@@ -68,8 +68,6 @@ number of purposes:
plaintext/markdown raw content or the rendered HTML (e.g. the
`apply_markdown` and `client_gravatar` features in our
[events API docs](https://zulipchat.com/api/register-queue)).
* The webapp [uses websockets](#websockets) for client/server
interaction for sending messages.
* Following our standard naming convention, input validation is done
inside the `check_message` function, which is responsible for
validating the user can send to the recipient,
@@ -101,32 +99,6 @@ number of purposes:
it makes when sending messages with large numbers of recipients,
to ensure its performance.
### Websockets
For the webapp only, we use WebSockets rather than standard HTTPS API
requests for triggering message sending. This is a design feature we
are very ambivalent about; it has some slight latency benefits, but is
also features extra complexity and some mostly-unmaintained
dependencies (e.g. `sockjs-tornado`). But in short, this system works
as follows:
* Requests are sent from the webapp to the backend via the `sockjs`
library (on the frontend) and `sockjs-tornado` (on the backend). This
ends up calling a handler in our Tornado codebase
(`zerver/tornado/socket.py`), which immediately puts the request into
the `message_sender` queue.
* The `message_sender` [queue processor](../subsystems/queuing.md)
reformats the request into a Django `HttpRequest` object with a fake
`SOCKET` HTTP method (which is why these requests appear as `SOCKET`
in our server logs), calls the Django `get_response` method on that
request, and returns the response to Tornado via the `tornado_return`
queue.
* Tornado then sends the result (success or error) back to the client
via the relevant WebSocket connection.
* `sockjs` automatically handles for us a fallback to longpolling in
the event that a WebSockets connection cannot be opened successfully
(which despite WebSockets being many years old is still a problem on
some networks today!).
## Local echo
An essential feature for a good chat is experience is local echo
@@ -204,17 +176,15 @@ implementation was under 150 lines of code.
This section just has a brief review of the sequence of steps all in
one place:
* User hits send in the compose box.
* Compose box validation runs; if passes, it locally echoes the
message and sends websocket message to Tornado
* Tornado converts websocket message to a `message_sender` queue item
* `message_sender` queue processor turns the queue item into a Django
`HttpRequest` and calls Django's main response handler
* The Django URL routes and middleware run, and eventually calls the
* Compose box validation runs; if it passes, the browser locally
echoes the message and then sends a request to the `POST /messages`
API endpoint.
* The Django URL routes and middleware run, and eventually call the
`send_message_backend` view function in `zerver/views/messages.py`.
(Alternatively, for an API request to send a message via the HTTP
API, things start here).
(Alternatively, for an API request to send a message via Zulip's
REST API, things start here).
* `send_message_backend` does some validation before triggering the
`check_message` + `do_send_messages` backend flow.
`check_message` + `do_send_messages` backend flow.
* That backend flow saves the data to the database and triggers a
`message` event in the `notify_tornado` queue (part of the events
system).
@@ -228,28 +198,11 @@ one place:
locally echoed message with the final message it received back
from the server (it indicates this to the sender by adding a
display timestamp to the message).
* For an API client, the `send_message_backend` view function returns
* The `send_message_backend` view function returns
a 200 `HTTP` response; the client receives that response and mostly
does nothing with it other than update some logging details. (This
may happen before or after the client receives the event notifying
it about the new message via its event queue.)
* For a browser (websockets sender), the client receives the
equivalent of the HTTP response via a websockets message from
Tornado (which, in turn, got that via the `tornado_return` queue).
## Error handling
When there's an error trying to send a message, it's important to not
lose the text the user had composed. Zulip handles this with a few
approaches:
* The data for a message in the process of being sent are stored in
browser local storage (see .e.g. `_save_localstorage_requests` in
`static/js/socket.js`), so that the client can retransmit as
appropriate, even if the browser reloads in the meantime.
* Additionally, Zulip renders UI for editing/retransmitting/resending
messages that had been locally echoed on top of those messages, in
red.
## Message editing

View File

@@ -6,9 +6,7 @@ set_global('document', {
location: {}, // we need this to load compose.js
});
set_global('page_params', {
use_websockets: false,
});
set_global('page_params', {});
set_global('$', global.make_zjquery());

View File

@@ -1,18 +1,11 @@
const noop = function () {};
set_global('$', global.make_zjquery());
set_global('page_params', {
use_websockets: true,
});
set_global('page_params', {});
set_global('channel', {});
set_global('navigator', {});
set_global('reload', {});
set_global('reload_state', {});
set_global('socket', {});
set_global('Socket', function () {
return global.socket;
});
set_global('sent_messages', {
start_tracking_message: noop,
report_server_ack: noop,
@@ -23,78 +16,6 @@ zrequire('people');
zrequire('util');
zrequire('transmit');
function test_with_mock_socket(test_params) {
transmit.initialize();
let socket_send_called;
const send_args = {};
global.socket.send = function (request, success, error) {
global.socket.send = undefined;
socket_send_called = true;
// Save off args for check_send_args callback.
send_args.request = request;
send_args.success = success;
send_args.error = error;
};
// Run the actual code here.
test_params.run_code();
assert(socket_send_called);
test_params.check_send_args(send_args);
}
run_test('transmit_message_sockets', () => {
page_params.use_websockets = true;
global.navigator.userAgent = 'unittest_transmit_message';
// Our request is mostly unimportant, except that the
// socket_user_agent field will be added.
const request = {foo: 'bar'};
let success_func_checked = false;
const success = function () {
success_func_checked = true;
};
// Our error function gets wrapped, so we set up a real
// function to test the wrapping mechanism.
let error_func_checked = false;
const error = function (error_msg) {
assert.equal(error_msg, 'Error sending message: simulated_error');
error_func_checked = true;
};
test_with_mock_socket({
run_code: function () {
transmit.send_message(request, success, error);
},
check_send_args: function (send_args) {
// The real code patches new data on the request, rather
// than making a copy, so we test both that it didn't
// clone the object and that it did add a field.
assert.equal(send_args.request, request);
assert.deepEqual(send_args.request, {
foo: 'bar',
socket_user_agent: 'unittest_transmit_message',
});
send_args.success({});
assert(success_func_checked);
// Our error function does get wrapped, so we test by
// using socket.send's error callback, which should
// invoke our test error function via a wrapper
// function in the real code.
send_args.error('response', {msg: 'simulated_error'});
assert(error_func_checked);
},
});
});
page_params.use_websockets = false;
run_test('transmit_message_ajax', () => {
let success_func_called;

View File

@@ -125,8 +125,6 @@ strict_optional = False
strict_optional = False
[mypy-zerver.worker.queue_processors]
strict_optional = False
[mypy-zerver.tornado.websocket_client]
strict_optional = False
[mypy-zerver.views.registration]
strict_optional = False

View File

@@ -46,10 +46,6 @@ parser.add_argument('--munin',
dest='munin',
action='store_true')
parser.add_argument('--websocket',
dest='websocket',
action='store_true')
parser.add_argument('config', nargs='?', default=None)
options = parser.parse_args()
@@ -78,7 +74,6 @@ os.environ['DJANGO_SETTINGS_MODULE'] = "zproject.settings"
django.setup()
from zerver.models import get_system_bot
from zerver.tornado.websocket_client import WebsocketClient
from django.conf import settings
states = {
@@ -93,10 +88,7 @@ def report(state, timestamp=None, msg=None):
now = int(time.time())
if msg is None:
msg = "send time was %s" % (timestamp,)
if options.websocket:
state_file_path = "/var/lib/nagios_state/check_send_receive_websockets_state"
else:
state_file_path = "/var/lib/nagios_state/check_send_receive_state"
state_file_path = "/var/lib/nagios_state/check_send_receive_state"
with open(state_file_path + ".tmp", 'w') as f:
f.write("%s|%s|%s|%s\n" % (now, states[state], state, msg))
os.rename(state_file_path + ".tmp", state_file_path)
@@ -123,10 +115,6 @@ def get_zulips():
report("CRITICAL", msg="Got heartbeat waiting for Zulip, which means get_events is hanging")
return [event['message'] for event in res['events']]
def send_message_via_websocket(websocket_client, recepient_email, content):
# type: (WebsocketClient, str, str) -> None
websocket_client.send_message('website', 'private', 'no topic', "", recepient_email, content)
if "staging" in options.site and settings.NAGIOS_STAGING_SEND_BOT is not None and \
settings.NAGIOS_STAGING_RECEIVE_BOT is not None:
sender = get_system_bot(settings.NAGIOS_STAGING_SEND_BOT)
@@ -161,19 +149,12 @@ except Exception:
msg_to_send = str(random.getrandbits(64))
time_start = time.time()
if options.websocket:
client = WebsocketClient(host_url=options.site, sockjs_url='/sockjs/366/v8nw22qe/websocket',
run_on_start=send_message_via_websocket, sender_email=sender.email,
recepient_email=recipient.email, content=msg_to_send,
validate_ssl=not options.insecure)
client.run()
else:
send_zulip(zulip_sender, {
"type": 'private',
"content": msg_to_send,
"subject": "time to send",
"to": recipient.email,
})
send_zulip(zulip_sender, {
"type": 'private',
"content": msg_to_send,
"subject": "time to send",
"to": recipient.email,
})
msg_content = [] # type: List[str]

View File

@@ -1,7 +0,0 @@
# Longpolling version needed for xhr streaming support
include /etc/nginx/zulip-include/proxy_longpolling;
proxy_set_header Upgrade $http_upgrade;
# This should override the Connection setting in zulip-include/proxy
proxy_set_header Connection $connection_upgrade;
proxy_set_header X-Real-IP $remote_addr;

View File

@@ -40,12 +40,6 @@ location /api/v1/events {
}
# Send sockjs requests to Tornado
location /sockjs {
proxy_pass http://tornado;
include /etc/nginx/zulip-include/location-sockjs;
}
# Send everything else to Django via uWSGI
location / {
include uwsgi_params;

View File

@@ -81,14 +81,10 @@ class zulip::app_frontend_base {
$queues_multiprocess = $zulip::base::total_memory_mb > 3500
$queues = $zulip::base::normal_queues
if $queues_multiprocess {
$message_sender_default_processes = 4
$uwsgi_default_processes = 6
} else {
$message_sender_default_processes = 2
$uwsgi_default_processes = 4
}
$message_sender_processes = zulipconf('application_server', 'message_sender_processes',
$message_sender_default_processes)
file { "${zulip::common::supervisor_conf_dir}/zulip.conf":
ensure => file,
require => Package[supervisor],

View File

@@ -42,12 +42,6 @@ http {
text/plain;
gzip_vary on;
# Select a Connection header for sockjs reverse-proxying
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
# https://wiki.mozilla.org/Security/Server_Side_TLS intermediate profile
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:50m;

View File

@@ -117,22 +117,6 @@ stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB)
stdout_logfile_backups=3 ; # of stdout logfile backups (default 10)
directory=/home/zulip/deployments/current/
[program:zulip_events_message_sender]
command=/home/zulip/deployments/current/manage.py process_queue --queue_name=message_sender --worker_num=%(process_num)s
process_name=%(program_name)s-%(process_num)s
priority=350 ; the relative start priority (default 999)
autostart=true ; start at supervisord start (default: true)
autorestart=true ; whether/when to restart (default: unexpected)
stopsignal=TERM ; signal used to kill process (default TERM)
stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10)
user=zulip ; setuid to this UNIX account to run the program
redirect_stderr=true ; redirect proc stderr to stdout (default false)
stdout_logfile=/var/log/zulip/events_message_sender.log ; stdout log path, NONE for none; default AUTO
stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB)
stdout_logfile_backups=5 ; # of stdout logfile backups (default 10)
directory=/home/zulip/deployments/current/
numprocs=<%= @message_sender_processes %>
; The below sample group section shows all possible group values,
; create one or more 'real' group: sections to create "heterogeneous"
; process groups.
@@ -145,9 +129,6 @@ programs=zulip_deliver_enqueued_emails, zulip_deliver_scheduled_messages, <% @qu
programs=zulip_deliver_enqueued_emails, zulip_events, zulip_deliver_scheduled_messages
<% end %>
[group:zulip-senders]
programs=zulip_events_message_sender
; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are

View File

@@ -4,4 +4,3 @@ USER=zulip
STATE_FILE=/var/lib/nagios_state/check_send_receive_state
* * * * * zulip /usr/lib/nagios/plugins/zulip_app_frontend/check_send_receive_time --nagios --site=https://$(/home/zulip/deployments/current/scripts/get-django-setting NAGIOS_BOT_HOST) >/dev/null
* * * * * zulip /usr/lib/nagios/plugins/zulip_app_frontend/check_send_receive_time --nagios --websocket --site=https://$(/home/zulip/deployments/current/scripts/get-django-setting NAGIOS_BOT_HOST) >/dev/null

View File

@@ -119,11 +119,6 @@ define command{
command_line /usr/lib/nagios/plugins/check_by_ssh -p $ARG1$ -l nagios -t 30 -i /var/lib/nagios/.ssh/id_rsa -H $HOSTADDRESS$ -C '/usr/lib/nagios/plugins/zulip_app_frontend/check_cron_file /var/lib/nagios_state/check_send_receive_state'
}
define command{
command_name check_send_receive_time_websockets
command_line /usr/lib/nagios/plugins/check_by_ssh -p $ARG1$ -l nagios -t 30 -i /var/lib/nagios/.ssh/id_rsa -H $HOSTADDRESS$ -C '/usr/lib/nagios/plugins/zulip_app_frontend/check_cron_file /var/lib/nagios_state/check_send_receive_websockets_state'
}
define command{
command_name check_rabbitmq_consumers
command_line /usr/lib/nagios/plugins/check_by_ssh -p 22 -l nagios -t 30 -i /var/lib/nagios/.ssh/id_rsa -H $HOSTADDRESS$ -C '/usr/lib/nagios/plugins/zulip_app_frontend/check_rabbitmq_consumers $ARG1$'

View File

@@ -143,15 +143,6 @@ define service {
contact_groups page_admins
}
define service {
use generic-service
service_description Check send receive time_websockets
check_command check_send_receive_time_websockets!22
max_check_attempts 2
hostgroup_name frontends
contact_groups page_admins
}
define service {
use generic-service
service_description Check analytics state
@@ -392,17 +383,6 @@ define service {
contact_groups page_admins
}
define service {
use generic-service
service_description Check rabbitmq tornado_return consumers
check_command check_rabbitmq_consumers!tornado_return
# Workaround weird checks 40s after first error causing alerts
# from a single failure because cron hasn't run again yet
max_check_attempts 3
hostgroup_name singletornado_frontends
contact_groups page_admins
}
define service {
use generic-service
service_description Check rabbitmq user_activity_interval consumers
@@ -480,17 +460,6 @@ define service {
contact_groups admins
}
define service {
use generic-service
service_description Check rabbitmq message sender consumers
check_command check_rabbitmq_consumers!message_sender
# Workaround weird checks 40s after first error causing alerts
# from a single failure because cron hasn't run again yet
max_check_attempts 3
hostgroup_name frontends
contact_groups page_admins
}
define service {
use generic-service
service_description Check rabbitmq missedmessage mobile notifications consumers

View File

@@ -34,11 +34,6 @@ server {
include /etc/nginx/zulip-include/proxy;
}
location /sockjs {
proxy_pass https://staging;
include /etc/nginx/zulip-include/location-sockjs;
}
# We don't need /api/v1/events/internal, because that doesn't go through the loadbalancer.
location ~ /json/events|/api/v1/events {
proxy_pass https://staging;
@@ -63,11 +58,6 @@ server {
include /etc/nginx/zulip-include/proxy;
}
location /sockjs {
proxy_pass https://prod;
include /etc/nginx/zulip-include/location-sockjs;
}
location ~ /json/events|/api/v1/events {
proxy_pass https://prod;
include /etc/nginx/zulip-include/proxy_longpolling;
@@ -91,11 +81,6 @@ server {
include /etc/nginx/zulip-include/proxy;
}
location /sockjs {
proxy_pass https://prod;
include /etc/nginx/zulip-include/location-sockjs;
}
location ~ /json/events|/api/v1/events {
proxy_pass https://prod;
include /etc/nginx/zulip-include/proxy_longpolling;
@@ -115,11 +100,6 @@ server {
include /etc/nginx/zulip-include/proxy;
}
location /sockjs {
proxy_pass https://prod;
include /etc/nginx/zulip-include/location-sockjs;
}
location ~ /json/events|/api/v1/events {
proxy_pass https://prod;
include /etc/nginx/zulip-include/proxy_longpolling;

View File

@@ -95,9 +95,6 @@ redis
# Needed for Python 2+3 compatibility
six
# Needed for Tornado websockets support
sockjs-tornado
# Needed to parse source maps for error reporting
sourcemap

View File

@@ -762,8 +762,6 @@ social-auth-core==3.2.0 \
--hash=sha256:8320666548a532eb158968eda542bbe1863682357c432d8c4e28034a7f1e3b58 \
--hash=sha256:d81ed681e3c0722300b61a0792c5db5d21206793f95ca810f010c1cc931c8d89 \
# via social-auth-app-django
sockjs-tornado==1.0.6 \
--hash=sha256:ec12b0c37723b0aac56610fb9b6aa68390720d0c9c2a10461df030c3a1d9af95
soupsieve==1.9.5 \
--hash=sha256:bdb0d917b03a1369ce964056fc195cfdff8819c40de04695a80bc813c3cfa1f5 \
--hash=sha256:e2c1c5dee4a1c36bcb790e0fabd5492d874b8ebd4617622c4f6a731701060dda \

View File

@@ -510,8 +510,6 @@ social-auth-core==3.2.0 \
--hash=sha256:8320666548a532eb158968eda542bbe1863682357c432d8c4e28034a7f1e3b58 \
--hash=sha256:d81ed681e3c0722300b61a0792c5db5d21206793f95ca810f010c1cc931c8d89 \
# via social-auth-app-django
sockjs-tornado==1.0.6 \
--hash=sha256:ec12b0c37723b0aac56610fb9b6aa68390720d0c9c2a10461df030c3a1d9af95
soupsieve==1.9.5 \
--hash=sha256:bdb0d917b03a1369ce964056fc195cfdff8819c40de04695a80bc813c3cfa1f5 \
--hash=sha256:e2c1c5dee4a1c36bcb790e0fabd5492d874b8ebd4617622c4f6a731701060dda \

View File

@@ -163,7 +163,7 @@ if not args.skip_migrations:
if os.path.exists("/etc/supervisor/conf.d/zulip_db.conf"):
subprocess.check_call(["supervisorctl", "stop", "process-fts-updates"], preexec_fn=su_to_zulip)
core_server_services = ["zulip-django", "zulip-senders:*",
core_server_services = ["zulip-django",
"zulip-tornado" if tornado_processes == 1 else "zulip-tornado:*"]
worker_services = ["zulip-workers:*"]
# Stop and start thumbor service only if thumbor is installed.

View File

@@ -55,7 +55,6 @@ queues = {
'error_reports',
'feedback_messages',
'invites',
'message_sender',
'missedmessage_emails',
'missedmessage_email_senders',
'email_senders',
@@ -68,7 +67,6 @@ queues = {
'user_presence',
# These queues may not be present if settings.TORNADO_PROCESSES > 1
'notify_tornado',
'tornado_return',
}
for queue_name in queues:
@@ -79,8 +77,6 @@ for line in output.split('\n'):
parts = line.split('\t')
if len(parts) >= 2:
queue_name = parts[0]
if queue_name.startswith("tornado_return_"):
queue_name = "tornado_return"
if queue_name.startswith("notify_tornado_"):
queue_name = "notify_tornado"
consumers[queue_name] += 1
@@ -92,7 +88,7 @@ for queue_name in consumers.keys():
state_file_tmp = state_file_path + "-tmp"
target_count = options.min_count
if queue_name in ["tornado_return", "notify_tornado"]:
if queue_name == "notify_tornado":
target_count = TORNADO_PROCESSES
if consumers[queue_name] < target_count:

View File

@@ -35,7 +35,7 @@ if args.fill_cache:
logging.info("Filling memcached caches")
subprocess.check_call(["./manage.py", "fill_memcached_caches"])
core_server_services = ["zulip-django", "zulip-senders:*"]
core_server_services = ["zulip-django"]
if os.path.exists("/etc/supervisor/conf.d/thumbor.conf"):
core_server_services.append("zulip-thumbor")

View File

@@ -7,7 +7,6 @@ import "../../third/jquery-filedrop/jquery.filedrop.js";
import "jquery-caret-plugin/src/jquery.caret.js";
import "../../third/jquery-idle/jquery.idle.js";
import "spectrum-colorpicker";
import "../../third/sockjs/sockjs-0.3.4.js";
import "../../third/marked/lib/marked.js";
import "xdate/src/xdate.js";
import "jquery-validation/dist/jquery.validate.js";
@@ -84,7 +83,6 @@ import "../fenced_code.js";
import "../markdown.js";
import "../local_message.js";
import "../echo.js";
import "../socket.js";
import "../sent_messages.js";
import "../compose_state.js";
import "../compose_actions.js";

View File

@@ -8,7 +8,6 @@ declare let Filter: any;
declare let LightboxCanvas: any;
declare let MessageListData: any;
declare let MessageListView: any;
declare let Socket: any;
declare let activity: any;
declare let admin: any;
declare let alert_words: any;

View File

@@ -2,10 +2,7 @@
$(function () {
if (util.is_mobile()) {
// if the client is mobile, disable websockets for message sending
// (it doesn't work on iOS for some reason).
page_params.use_websockets = false;
// Also disable the tutorial; it's ugly on mobile.
// Disable the tutorial; it's ugly on mobile.
page_params.needs_tutorial = false;
}
@@ -54,5 +51,4 @@ $(function () {
return $(this).is(sel) || $(this).closest(sel).length;
};
}
transmit.initialize();
});

View File

@@ -1,403 +0,0 @@
const CLOSE_REASONS = {
none_given: {code: 4000, msg: "No reason provided"},
no_heartbeat: {code: 4001, msg: "Missed too many heartbeats"},
auth_fail: {code: 4002, msg: "Authentication failed"},
ack_timeout: {code: 4003, msg: "ACK timeout"},
cant_send: {code: 4004, msg: "User attempted to send while Socket was not ready"},
unsuspend: {code: 4005, msg: "Got unsuspend event"},
};
function Socket(url) {
this.url = url;
this._is_open = false;
this._is_authenticated = false;
this._is_reconnecting = false;
this._reconnect_initiation_time = null;
this._next_req_id_counter = 0;
this._connection_failures = 0;
this._reconnect_timeout_id = null;
this._heartbeat_timeout_id = null;
this._localstorage_requests_key = 'zulip_socket_requests';
this._requests = this._localstorage_requests();
const that = this;
this._is_unloading = false;
$(window).on("unload", function () {
that._is_unloading = true;
});
$(document).on("unsuspend", function () {
that._try_to_reconnect({reason: 'unsuspend'});
});
this._supported_protocols = [
'websocket',
'xdr-streaming',
'xhr-streaming',
'xdr-polling',
'xhr-polling',
'jsonp-polling',
];
if (page_params.test_suite) {
this._supported_protocols = _.reject(this._supported_protocols,
function (x) { return x === 'xhr-streaming'; });
// Don't create the SockJS on startup when running under the test suite.
// The first XHR request gets considered part of the page load and
// therefore the PhantomJS onLoadFinished handler doesn't get called
// until the SockJS XHR finishes, which happens at the heartbeat, 25
// seconds later. The SockJS objects will be created on demand anyway.
} else {
this._create_sockjs_object();
}
}
Socket.prototype = {
_create_sockjs_object: function Socket__create_sockjs_object() {
this._sockjs = new SockJS(this.url, null, {protocols_whitelist: this._supported_protocols});
this._setup_sockjs_callbacks(this._sockjs);
},
_make_request: function Socket__make_request(type) {
return {req_id: this._get_next_req_id(),
type: type,
state: 'pending'};
},
// Note that by default messages are queued and retried across
// browser restarts if a restart takes place before a message
// is successfully transmitted.
// If that is the case, the success/error callbacks will not
// be automatically called.
send: function Socket__send(msg, success, error) {
const request = this._make_request('request');
request.msg = msg;
request.success = success;
request.error = error;
this._save_request(request);
if (!this._can_send()) {
this._try_to_reconnect({reason: 'cant_send'});
return;
}
this._do_send(request);
},
_get_next_req_id: function Socket__get_next_req_id() {
const req_id = page_params.queue_id + ':' + this._next_req_id_counter;
this._next_req_id_counter += 1;
return req_id;
},
_req_id_too_new: function Socket__req_id_too_new(req_id) {
const counter = req_id.split(':')[2];
return parseInt(counter, 10) >= this._next_req_id_counter;
},
_req_id_sorter: function Socket__req_id_sorter(req_id_a, req_id_b) {
// Sort in ascending order
const a_count = parseInt(req_id_a.split(':')[2], 10);
const b_count = parseInt(req_id_b.split(':')[2], 10);
return a_count - b_count;
},
_do_send: function Socket__do_send(request) {
const that = this;
this._requests[request.req_id].ack_timeout_id = setTimeout(function () {
blueslip.info("Timeout on ACK for request " + request.req_id);
that._try_to_reconnect({reason: 'ack_timeout'});
}, 2000);
try {
this._update_request_state(request.req_id, 'sent');
this._sockjs.send(JSON.stringify({req_id: request.req_id,
type: request.type, request: request.msg}));
} catch (e) {
this._update_request_state(request.req_id, 'pending');
if (e instanceof Error && e.message === 'INVALID_STATE_ERR') {
// The connection was somehow closed. Our on-close handler will
// be called imminently and we'll retry this request upon reconnect.
return;
} else if (e instanceof Error && e.message.indexOf("NS_ERROR_NOT_CONNECTED") !== -1) {
// This is a rarely-occurring Firefox error. I'm not sure
// whether our on-close handler will be called, so let's just
// call close() explicitly.
this._sockjs.close();
return;
}
throw e;
}
},
_can_send: function Socket__can_send() {
return this._is_open && this._is_authenticated;
},
_resend: function Socket__resend(req_id) {
const req_info = this._requests[req_id];
if (req_info.ack_timeout_id !== null) {
clearTimeout(req_info.ack_timeout_id);
req_info.ack_timeout_id = null;
}
if (req_info.type !== 'request') {
return;
}
this._do_send(req_info);
},
_process_response: function Socket__process_response(req_id, response) {
const req_info = this._requests[req_id];
if (req_info === undefined) {
if (this._req_id_too_new(req_id)) {
blueslip.error("Got a response for an unknown request",
{request_id: req_id, next_id: this._next_req_id_counter,
outstanding_ids: _.keys(this._requests)});
}
// There is a small race where we might start reauthenticating
// before one of our requests has finished but then have the request
// finish and thus receive the finish notification both from the
// status inquiry and from the normal response. Therefore, we might
// be processing the response for a request where we already got the
// response from a status inquiry. In that case, don't process the
// response twice.
return;
}
if (response.result === 'success' && req_info.success !== undefined) {
req_info.success(response);
} else if (req_info.error !== undefined) {
req_info.error('response', response);
}
this._remove_request(req_id);
},
_process_ack: function Socket__process_ack(req_id) {
const req_info = this._requests[req_id];
if (req_info === undefined) {
blueslip.error("Got an ACK for an unknown request",
{request_id: req_id, next_id: this._next_req_id_counter,
outstanding_ids: _.keys(this._requests)});
return;
}
if (req_info.ack_timeout_id !== null) {
clearTimeout(req_info.ack_timeout_id);
req_info.ack_timeout_id = null;
}
},
_setup_sockjs_callbacks: function Socket__setup_sockjs_callbacks(sockjs) {
const that = this;
sockjs.onopen = function Socket__sockjs_onopen() {
blueslip.info("Socket connected [transport=" + sockjs.protocol + "]");
if (that._reconnect_initiation_time !== null) {
// If this is a reconnect, network was probably
// recently interrupted, so we optimistically restart
// get_events
server_events.restart_get_events();
}
that._is_open = true;
// Notify listeners that we've finished the websocket handshake
$(document).trigger($.Event('websocket_postopen.zulip', {}));
const request = that._make_request('auth');
request.msg = {csrf_token: csrf_token,
queue_id: page_params.queue_id,
status_inquiries: _.keys(that._requests)};
request.success = function (resp) {
that._is_authenticated = true;
that._is_reconnecting = false;
that._reconnect_initiation_time = null;
that._connection_failures = 0;
const resend_queue = [];
_.each(resp.status_inquiries, function (status, id) {
if (status.status === 'complete') {
that._process_response(id, status.response);
} else if (status.status === 'received') {
that._update_request_state(id, 'sent');
} else if (status.status === 'not_received') {
resend_queue.push(id);
}
});
resend_queue.sort(that._req_id_sorter);
_.each(resend_queue, function (id) {
that._resend(id);
});
};
request.error = function (type, resp) {
blueslip.info("Could not authenticate with server: " + resp.msg);
that._connection_failures += 1;
that._try_to_reconnect({reason: 'auth_fail',
wait_time: that._reconnect_wait_time()});
};
that._save_request(request);
that._do_send(request);
};
sockjs.onmessage = function Socket__sockjs_onmessage(event) {
if (event.data.type === 'ack') {
that._process_ack(event.data.req_id);
} else {
that._process_response(event.data.req_id, event.data.response);
}
};
sockjs.onheartbeat = function Socket__sockjs_onheartbeat() {
if (that._heartbeat_timeout_id !== null) {
clearTimeout(that._heartbeat_timeout_id);
that._heartbeat_timeout_id = null;
}
that._heartbeat_timeout_id = setTimeout(function () {
that._heartbeat_timeout_id = null;
blueslip.info("Missed too many hearbeats");
that._try_to_reconnect({reason: 'no_heartbeat'});
}, 60000);
};
sockjs.onclose = function Socket__sockjs_onclose(event) {
if (that._is_unloading) {
return;
}
// We've failed to handshake, but notify that the attempt finished
$(document).trigger($.Event('websocket_postopen.zulip', {}));
blueslip.info("SockJS connection lost. Attempting to reconnect soon."
+ " (" + event.code.toString() + ", " + event.reason + ")");
that._connection_failures += 1;
that._is_reconnecting = false;
// We don't need to specify a reason because the Socket is already closed
that._try_to_reconnect({wait_time: that._reconnect_wait_time()});
};
},
_reconnect_wait_time: function Socket__reconnect_wait_time() {
if (this._connection_failures === 1) {
// We specify a non-zero timeout here so that we don't try to
// immediately reconnect when the page is refreshing
return 30;
}
return Math.min(90, Math.exp(this._connection_failures / 2)) * 1000;
},
_try_to_reconnect: function Socket__try_to_reconnect(opts) {
opts = _.extend({wait_time: 0, reason: 'none_given'}, opts);
const that = this;
const now = new Date().getTime();
if (this._is_reconnecting && now - this._reconnect_initiation_time < 1000) {
// Only try to reconnect once a second
return;
}
if (this._reconnect_timeout_id !== null) {
clearTimeout(this._reconnect_timeout_id);
this._reconnect_timeout_id = null;
}
if (this._heartbeat_timeout_id !== null) {
clearTimeout(that._heartbeat_timeout_id);
this._heartbeat_timeout_id = null;
}
// Cancel any pending auth requests and any timeouts for ACKs
_.each(this._requests, function (val, key) {
if (val.ack_timeout_id !== null) {
clearTimeout(val.ack_timeout_id);
val.ack_timeout_id = null;
}
if (val.type === 'auth') {
that._remove_request(key);
}
});
this._is_open = false;
this._is_authenticated = false;
this._is_reconnecting = true;
this._reconnect_initiation_time = now;
// This is a little weird because we're also called from the SockJS
// onclose handler. Fortunately, close() does nothing on an
// already-closed SockJS object. However, we do have to check that
// this._sockjs isn't undefined since it's not created immediately
// when running under the test suite.
if (this._sockjs !== undefined) {
const close_reason = CLOSE_REASONS[opts.reason];
this._sockjs.close(close_reason.code, close_reason.msg);
}
this._reconnect_timeout_id = setTimeout(function () {
that._reconnect_timeout_id = null;
blueslip.info("Attempting socket reconnect.");
that._create_sockjs_object();
}, opts.wait_time);
},
_localstorage_requests: function Socket__localstorage_requests() {
if (!localstorage.supported()) {
return {};
}
return JSON.parse(window.localStorage[this._localstorage_requests_key] || "{}");
},
_save_localstorage_requests: function Socket__save_localstorage_requests() {
if (!localstorage.supported()) {
return;
}
// Auth requests are always session-specific, so don't store them for later
const non_auth_reqs = {};
_.each(this._requests, function (val, key) {
if (val.type !== 'auth') {
non_auth_reqs[key] = val;
}
});
try {
window.localStorage[this._localstorage_requests_key] = JSON.stringify(non_auth_reqs);
} catch (e) {
// We can't catch a specific exception type, because browsers return different types
// for out of space errors. See http://chrisberkhout.com/blog/localstorage-errors/ for
// more details.
blueslip.warn("Failed to save to local storage, caught exception when saving " + e);
}
},
_save_request: function Socket__save_request(request) {
this._requests[request.req_id] = request;
if (!localstorage.supported()) {
return;
}
this._save_localstorage_requests();
},
_remove_request: function Socket__remove_request(req_id) {
delete this._requests[req_id];
if (!localstorage.supported()) {
return;
}
this._save_localstorage_requests();
},
_update_request_state: function Socket__update_request_state(req_id, state) {
this._requests[req_id].state = state;
if (!localstorage.supported()) {
return;
}
this._save_localstorage_requests();
},
};
module.exports = Socket;
window.Socket = Socket;

View File

@@ -1,25 +1,3 @@
let socket;
exports.initialize = function () {
// We initialize the socket inside a function so that this code
// runs after `csrf_token` is initialized in setup.js.
if (page_params.use_websockets) {
socket = new Socket("/sockjs");
}
// For debugging. The socket will eventually move out of this file anyway.
exports._socket = socket;
};
function send_message_socket(request, success, error) {
request.socket_user_agent = navigator.userAgent;
socket.send(request, success, function (type, resp) {
let err_msg = "Error sending message";
if (type === 'response') {
err_msg += ": " + resp.msg;
}
error(err_msg);
});
}
function send_message_ajax(request, success, error) {
channel.post({
url: '/json/messages',
@@ -52,11 +30,7 @@ exports.send_message = function (request, on_success, error) {
sent_messages.report_server_ack(request.local_id);
}
if (page_params.use_websockets) {
send_message_socket(request, success, error);
} else {
send_message_ajax(request, success, error);
}
send_message_ajax(request, success, error);
};
exports.reply_message = function (opts) {

File diff suppressed because it is too large Load Diff

View File

@@ -124,8 +124,7 @@ done
echo; echo "Now running additional Nagios tests"; echo
if ! /usr/lib/nagios/plugins/zulip_app_frontend/check_queue_worker_errors || \
! su zulip -c /usr/lib/nagios/plugins/zulip_postgres_appdb/check_fts_update_log; then # || \
# ! su zulip -c "/usr/lib/nagios/plugins/zulip_app_frontend/check_send_receive_time --site=https://127.0.0.1/api --nagios --insecure" || \
# ! su zulip -c "/usr/lib/nagios/plugins/zulip_app_frontend/check_send_receive_time --site=https://127.0.0.1/api --nagios --websocket --insecure"; then
# ! su zulip -c "/usr/lib/nagios/plugins/zulip_app_frontend/check_send_receive_time --site=https://127.0.0.1/api --nagios --insecure"; then
set +x
echo
echo "FAILURE: Nagios checks don't pass:"

View File

@@ -19,7 +19,6 @@ from tornado import httputil
from tornado import gen
from tornado import web
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketHandler, websocket_connect
from typing import Any, Callable, Generator, List, Optional
@@ -200,62 +199,12 @@ def fetch_request(url, callback, **kwargs):
callback(response)
class BaseWebsocketHandler(WebSocketHandler):
class BaseHandler(web.RequestHandler):
# target server ip
target_host = '127.0.0.1' # type: str
# target server port
target_port = None # type: int
def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
super().__init__(*args, **kwargs)
# define client for target websocket server
self.client = None # type: Any
def get(self, *args, **kwargs):
# type: (*Any, **Any) -> Optional[Callable[..., Any]]
# use get method from WebsocketHandler
return super().get(*args, **kwargs)
def open(self):
# type: () -> None
# setup connection with target websocket server
websocket_url = "ws://{host}:{port}{uri}".format(
host=self.target_host,
port=self.target_port,
uri=self.request.uri
)
request = httpclient.HTTPRequest(websocket_url)
request.headers = self._add_request_headers(['sec-websocket-extensions'])
websocket_connect(request, callback=self.open_callback,
on_message_callback=self.on_client_message)
def open_callback(self, future):
# type: (Any) -> None
# callback on connect with target websocket server
self.client = future.result()
def on_client_message(self, message):
# type: (str) -> None
if not message:
# if message empty -> target websocket server close connection
return self.close()
if self.ws_connection:
# send message to client if connection exists
self.write_message(message, False)
def on_message(self, message, binary=False):
# type: (str, bool) -> Optional[Callable[..., Any]]
if not self.client:
# close websocket proxy connection if no connection with target websocket server
return self.close()
self.client.write_message(message, binary)
return None
def check_origin(self, origin):
# type: (str) -> bool
return True
def _add_request_headers(self, exclude_lower_headers_list=None):
# type: (Optional[List[str]]) -> httputil.HTTPHeaders
exclude_lower_headers_list = exclude_lower_headers_list or []
@@ -265,14 +214,9 @@ class BaseWebsocketHandler(WebSocketHandler):
headers.add(header, v)
return headers
class CombineHandler(BaseWebsocketHandler):
def get(self, *args, **kwargs):
# type: (*Any, **Any) -> Optional[Callable[..., Any]]
if self.request.headers.get("Upgrade", "").lower() == 'websocket':
return super().get(*args, **kwargs)
return None
def get(self):
# type: () -> None
pass
def head(self):
# type: () -> None
@@ -321,8 +265,6 @@ class CombineHandler(BaseWebsocketHandler):
self.request.headers['X-REAL-IP'] = self.request.remote_ip
if 'X-FORWARDED_PORT' not in self.request.headers:
self.request.headers['X-FORWARDED-PORT'] = str(proxy_port)
if self.request.headers.get("Upgrade", "").lower() == 'websocket':
return super().prepare()
url = transform_url(
self.request.protocol,
self.request.path,
@@ -349,19 +291,19 @@ class CombineHandler(BaseWebsocketHandler):
self.finish()
class WebPackHandler(CombineHandler):
class WebPackHandler(BaseHandler):
target_port = webpack_port
class DjangoHandler(CombineHandler):
class DjangoHandler(BaseHandler):
target_port = django_port
class TornadoHandler(CombineHandler):
class TornadoHandler(BaseHandler):
target_port = tornado_port
class ThumborHandler(CombineHandler):
class ThumborHandler(BaseHandler):
target_port = thumbor_port
@@ -372,14 +314,13 @@ class Application(web.Application):
(r"/json/events.*", TornadoHandler),
(r"/api/v1/events.*", TornadoHandler),
(r"/webpack.*", WebPackHandler),
(r"/sockjs.*", TornadoHandler),
(r"/thumbor.*", ThumborHandler),
(r"/.*", DjangoHandler)
]
super().__init__(handlers, enable_logging=enable_logging)
def log_request(self, handler):
# type: (BaseWebsocketHandler) -> None
# type: (BaseHandler) -> None
if self.settings['enable_logging']:
super().log_request(handler)

View File

@@ -118,9 +118,7 @@ not_yet_fully_covered = {path for target in [
'zerver/tornado/handlers.py',
'zerver/tornado/ioloop_logging.py',
'zerver/tornado/sharding.py',
'zerver/tornado/socket.py',
'zerver/tornado/views.py',
'zerver/tornado/websocket_client.py',
# Data import files; relatively low priority
'zerver/data_import/hipchat*.py',
'zerver/data_import/sequencer.py',

View File

@@ -26,4 +26,4 @@ LATEST_RELEASE_ANNOUNCEMENT = "https://blog.zulip.org/2019/12/13/zulip-2-1-relea
# historical commits sharing the same major version, in which case a
# minor version bump suffices.
PROVISION_VERSION = '66.4'
PROVISION_VERSION = '67.0'

View File

@@ -120,13 +120,8 @@ def update_user_activity(request: HttpRequest, user_profile: UserProfile,
def require_post(func: ViewFuncT) -> ViewFuncT:
@wraps(func)
def wrapper(request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
if (request.method != "POST" and
not (request.method == "SOCKET" and
request.META['zulip.emulated_method'] == "POST")):
if request.method == "SOCKET": # nocoverage # zulip.emulated_method is always POST
err_method = "SOCKET/%s" % (request.META['zulip.emulated_method'],)
else:
err_method = request.method
if request.method != "POST":
err_method = request.method
logging.warning('Method Not Allowed (%s): %s', err_method, request.path,
extra={'status_code': 405, 'request': request})
return HttpResponseNotAllowed(["POST"])
@@ -251,13 +246,9 @@ def validate_account_and_subdomain(request: HttpRequest, user_profile: UserProfi
if not user_profile.is_active:
raise JsonableError(_("Account is deactivated"))
# Either the subdomain matches, or processing a websockets message
# in the message_sender worker (which will have already had the
# subdomain validated), or we're accessing Tornado from and to
# localhost (aka spoofing a request as the user).
# Either the subdomain matches, or we're accessing Tornado from
# and to localhost (aka spoofing a request as the user).
if (not user_matches_subdomain(get_subdomain(request), user_profile) and
not (request.method == "SOCKET" and
request.META['SERVER_NAME'] == "127.0.0.1") and
not (settings.RUNNING_INSIDE_TORNADO and
request.META["SERVER_NAME"] == "127.0.0.1" and
request.META["REMOTE_ADDR"] == "127.0.0.1")):

View File

@@ -85,8 +85,6 @@ def rest_dispatch(request: HttpRequest, **kwargs: Any) -> HttpResponse:
method_to_use = request.method
if request.POST and 'method' in request.POST:
method_to_use = request.POST['method']
if method_to_use == "SOCKET" and "zulip.emulated_method" in request.META:
method_to_use = request.META["zulip.emulated_method"]
if method_to_use in supported_methods:
entry = supported_methods[method_to_use]

View File

@@ -32,8 +32,7 @@ class Command(BaseCommand):
else:
queue_name = options['queue_name']
if not (queue_name in get_active_worker_queues() or
queue_name.startswith("notify_tornado") or
queue_name.startswith("tornado_return")):
queue_name.startswith("notify_tornado")):
raise CommandError("Unknown queue %s" % (queue_name,))
print("Purging queue %s" % (queue_name,))

View File

@@ -24,9 +24,7 @@ from zerver.tornado.application import create_tornado_application, \
from zerver.tornado.autoreload import start as zulip_autoreload_start
from zerver.tornado.event_queue import add_client_gc_hook, \
missedmessage_hook, process_notification, setup_event_queue
from zerver.tornado.sharding import notify_tornado_queue_name, \
tornado_return_queue_name
from zerver.tornado.socket import respond_send_message
from zerver.tornado.sharding import notify_tornado_queue_name
if settings.USING_RABBITMQ:
from zerver.lib.queue import get_queue_client
@@ -93,8 +91,6 @@ class Command(BaseCommand):
# Process notifications received via RabbitMQ
queue_client.register_json_consumer(notify_tornado_queue_name(int(port)),
process_notification)
queue_client.register_json_consumer(tornado_return_queue_name(int(port)),
respond_send_message)
try:
# Application is an instance of Django's standard wsgi handler.

View File

@@ -215,7 +215,6 @@ class HomeTest(ZulipTestCase):
"unread_msgs",
"unsubscribed",
"upgrade_text_for_wide_organization_logo",
"use_websockets",
"user_id",
"user_status",
"warn_no_email",

View File

@@ -1,44 +1,24 @@
# -*- coding: utf-8 -*-
"""WebSocketBaseTestCase is based on combination of Tornado
and Django test systems. It require to use decorator '@gen.coroutine'
for each test case method( see documentation: http://www.tornadoweb.org/en/stable/testing.html).
It requires implementation of 'get_app' method to initialize tornado application and launch it.
"""
import time
import ujson
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django.db import close_old_connections
from django.core import signals
from django.test import override_settings
from tornado.httpclient import HTTPRequest, HTTPResponse
from tornado.httpclient import HTTPResponse
from zerver.lib.test_helpers import POSTRequestMock
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.topic import TOPIC_NAME
from zerver.models import UserProfile
from tornado import gen
from tornado.testing import AsyncHTTPTestCase, gen_test
from tornado.testing import AsyncHTTPTestCase
from tornado.web import Application
from tornado.websocket import websocket_connect
from zerver.tornado.application import create_tornado_application
from zerver.tornado import event_queue
from zerver.tornado.event_queue import fetch_events, \
process_event
from zerver.tornado.views import get_events
from zerver.tornado.event_queue import process_event
from http.cookies import SimpleCookie
import urllib.parse
from unittest.mock import patch
from typing import Any, Callable, Dict, Generator, Optional, List, cast
from typing import Any, Dict, Optional, List, cast
class TornadoWebTestCase(AsyncHTTPTestCase, ZulipTestCase):
def setUp(self) -> None:
@@ -139,335 +119,3 @@ class EventsTestCase(TornadoWebTestCase):
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['data'], 'test data')
self.assertEqual(data['result'], 'success')
class WebSocketBaseTestCase(AsyncHTTPTestCase, ZulipTestCase):
def setUp(self) -> None:
settings.RUNNING_INSIDE_TORNADO = True
super().setUp()
def tearDown(self) -> None:
super().tearDown()
settings.RUNNING_INSIDE_TORNADO = False
@gen.coroutine
def ws_connect(self, path: str, cookie_header: str,
compression_options: Optional[Any]=None
) -> Generator[Any, Callable[[HTTPRequest, Optional[Any]], Any], None]:
request = HTTPRequest(url='ws://127.0.0.1:%d%s' % (self.get_http_port(), path))
request.headers.add('Cookie', cookie_header)
ws = yield websocket_connect(
request,
compression_options=compression_options)
raise gen.Return(ws)
@gen.coroutine
def close(self, ws: Any) -> None:
"""Close a websocket connection and wait for the server side.
"""
ws.close()
class TornadoTestCase(WebSocketBaseTestCase):
@override_settings(DEBUG=False)
def get_app(self) -> Application:
""" Return tornado app to launch for test cases
"""
return create_tornado_application(9993)
@staticmethod
def tornado_call(view_func: Callable[[HttpRequest, UserProfile], HttpResponse],
user_profile: UserProfile,
post_data: Dict[str, Any]) -> HttpResponse:
request = POSTRequestMock(post_data, user_profile)
return view_func(request, user_profile)
@staticmethod
def get_cookie_header(cookies: "SimpleCookie[str]") -> str:
return ';'.join(
["{}={}".format(name, value.value) for name, value in cookies.items()])
def _get_cookies(self, user_profile: UserProfile) -> "SimpleCookie[str]":
resp = self.login_with_return(user_profile.email)
return resp.cookies
@gen.coroutine
def _websocket_auth(self, ws: Any,
queue_events_data: Dict[str, Dict[str, str]],
cookies: "SimpleCookie[str]") -> Generator[str, str, None]:
auth_queue_id = ':'.join((queue_events_data['response']['queue_id'], '0'))
message = {
"req_id": auth_queue_id,
"type": "auth",
"request": {
"csrf_token": cookies.get('csrftoken').coded_value,
"queue_id": queue_events_data['response']['queue_id'],
"status_inquiries": []
}
}
auth_frame_str = ujson.dumps(message)
ws.write_message(ujson.dumps([auth_frame_str]))
response_ack = yield ws.read_message()
response_message = yield ws.read_message()
raise gen.Return([response_ack, response_message])
@staticmethod
def _get_queue_events_data(email: str) -> Dict[str, Dict[str, str]]:
user_profile = UserProfile.objects.filter(email=email).first()
events_query = {
'queue_id': None,
'narrow': [],
'handler_id': 0,
'all_public_streams': False,
'client_type_name': 'website',
'new_queue_data': {
'apply_markdown': True,
'client_gravatar': False,
'narrow': [],
'all_public_streams': False,
'realm_id': user_profile.realm_id,
'client_type_name': 'website',
'event_types': None,
'user_profile_id': user_profile.id,
'queue_timeout': 0,
'last_connection_time': time.time()},
'last_event_id': -1,
'event_types': None,
'user_profile_id': user_profile.id,
'dont_block': True,
'lifespan_secs': 0
}
result = fetch_events(events_query)
return result
def _check_message_sending(self, request_id: str,
ack_resp: str,
msg_resp: str,
profile: UserProfile,
queue_events_data: Dict[str, Dict[str, str]]) -> None:
self.assertEqual(ack_resp[0], 'a')
self.assertEqual(
ujson.loads(ack_resp[1:]),
[
{
"type": "ack",
"req_id": request_id
}
])
self.assertEqual(msg_resp[0], 'a')
result = self.tornado_call(get_events, profile,
{"queue_id": queue_events_data['response']['queue_id'],
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
result_content = ujson.loads(result.content)
self.assertEqual(len(result_content['events']), 1)
message_id = result_content['events'][0]['message']['id']
self.assertEqual(
ujson.loads(msg_resp[1:]),
[
{
"type": "response",
"response":
{
"result": "success",
"id": message_id,
"msg": ""
},
"req_id": request_id
}
])
@gen_test
def test_tornado_connect(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
response = yield ws.read_message()
self.assertEqual(response, 'o')
yield self.close(ws)
@gen_test
def test_tornado_auth(self) -> Generator[str, 'TornadoTestCase', None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
queue_events_data = self._get_queue_events_data(user_profile.email)
request_id = ':'.join((queue_events_data['response']['queue_id'], '0'))
response = yield self._websocket_auth(ws, queue_events_data, cookies)
self.assertEqual(response[0][0], 'a')
self.assertEqual(
ujson.loads(response[0][1:]),
[
{
"type": "ack",
"req_id": request_id
}
])
self.assertEqual(response[1][0], 'a')
self.assertEqual(
ujson.loads(response[1][1:]),
[
{"req_id": request_id,
"response": {
"result": "success",
"status_inquiries": {},
"msg": ""
},
"type": "response"}
])
yield self.close(ws)
@gen_test
def test_sending_private_message(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "private",
TOPIC_NAME: "(no topic)",
"stream": "",
"private_message_recipient": self.example_email('othello'),
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps([self.example_email('othello')]),
"reply_to": self.example_email('hamlet'),
"local_id": -1
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
ack_resp = yield ws.read_message()
msg_resp = yield ws.read_message()
self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
yield self.close(ws)
@gen_test
def test_sending_stream_message(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "stream",
TOPIC_NAME: "Stream message",
"stream": "Denmark",
"private_message_recipient": "",
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps(["Denmark"]),
"reply_to": self.example_email('hamlet'),
"local_id": -1
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
ack_resp = yield ws.read_message()
msg_resp = yield ws.read_message()
self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
yield self.close(ws)
@gen_test
def test_sending_stream_message_from_electron(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "stream",
TOPIC_NAME: "Stream message",
"stream": "Denmark",
"private_message_recipient": "",
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps(["Denmark"]),
"reply_to": self.example_email('hamlet'),
"local_id": -1,
"socket_user_agent": "ZulipElectron/1.5.0"
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
ack_resp = yield ws.read_message()
msg_resp = yield ws.read_message()
self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
yield self.close(ws)
@gen_test
def test_sending_message_error(self) -> Any:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "stream",
TOPIC_NAME: "Stream message",
"stream": "Denmark",
"private_message_recipient": "",
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps(["Denmark"]),
"reply_to": self.example_email('hamlet'),
"local_id": -1
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
def wrap_get_response(request: HttpRequest) -> HttpResponse:
request._log_data = {'bugdown_requests_start': 0,
'time_started': 0,
'bugdown_time_start': 0,
'remote_cache_time_start': 0,
'remote_cache_requests_start': 0,
'startup_time_delta': 0}
class ResponseObject(object):
def __init__(self) -> None:
self.content = '{"msg":"","id":0,"result":"error"}'.encode('utf8')
return ResponseObject()
# Simulate an error response to cover the respective code paths.
with patch('django.core.handlers.base.BaseHandler.get_response', wraps=wrap_get_response), \
patch('django.db.connection.is_usable', return_value=False), \
patch('os.kill', side_effect=OSError()):
yield ws.read_message()
yield self.close(ws)

View File

@@ -6,7 +6,6 @@ from zerver.tornado import autoreload
from zerver.lib.queue import get_queue_client
from zerver.tornado.handlers import AsyncDjangoHandler
from zerver.tornado.socket import get_sockjs_router
def setup_tornado_rabbitmq() -> None: # nocoverage
# When tornado is shut down, disconnect cleanly from rabbitmq
@@ -24,8 +23,7 @@ def create_tornado_application(port: int) -> tornado.web.Application:
)
# Application is an instance of Django's standard wsgi handler.
return tornado.web.Application(([(url, AsyncDjangoHandler) for url in urls] +
get_sockjs_router(port).urls),
return tornado.web.Application([(url, AsyncDjangoHandler) for url in urls],
debug=settings.DEBUG,
autoreload=False,
# Disable Tornado's own request logging, since we have our own

View File

@@ -20,8 +20,3 @@ def notify_tornado_queue_name(port: int) -> str:
if settings.TORNADO_PROCESSES == 1:
return "notify_tornado"
return "notify_tornado_port_%d" % (port,)
def tornado_return_queue_name(port: int) -> str:
if settings.TORNADO_PROCESSES == 1:
return "tornado_return"
return "tornado_return_port_%d" % (port,)

View File

@@ -1,281 +0,0 @@
# See https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html#websockets
# for high-level documentation on this subsystem.
from typing import Any, Dict, Mapping, Optional, Union
from django.conf import settings
from django.utils.timezone import now as timezone_now
from django.utils.translation import ugettext as _
from django.contrib.sessions.models import Session as djSession
try:
from django.middleware.csrf import _compare_salted_tokens
except ImportError:
# This function was added in Django 1.10.
def _compare_salted_tokens(token1: str, token2: str) -> bool:
return token1 == token2
import sockjs.tornado
from sockjs.tornado.session import ConnectionInfo
import tornado.ioloop
import ujson
import logging
from zerver.models import UserProfile, get_user_profile_by_id
from zerver.lib.queue import queue_json_publish
from zerver.decorator import JsonableError
from zerver.middleware import record_request_start_data, record_request_stop_data, \
record_request_restart_data, write_log_line, format_timedelta
from zerver.lib.redis_utils import get_redis_client
from zerver.lib.sessions import get_session_user
from zerver.tornado.event_queue import get_client_descriptor
from zerver.tornado.exceptions import BadEventQueueIdError
from zerver.tornado.sharding import tornado_return_queue_name
logger = logging.getLogger('zulip.socket')
def get_user_profile(session_id: Optional[str]) -> Optional[UserProfile]:
if session_id is None:
return None
try:
djsession = djSession.objects.get(expire_date__gt=timezone_now(),
session_key=session_id)
except djSession.DoesNotExist:
return None
try:
return get_user_profile_by_id(get_session_user(djsession))
except (UserProfile.DoesNotExist, KeyError):
return None
connections = dict() # type: Dict[Union[int, str], 'SocketConnection']
def get_connection(id: Union[int, str]) -> Optional['SocketConnection']:
return connections.get(id)
def register_connection(id: Union[int, str], conn: 'SocketConnection') -> None:
# Kill any old connections if they exist
if id in connections:
connections[id].close()
conn.client_id = id
connections[conn.client_id] = conn
def deregister_connection(conn: 'SocketConnection') -> None:
assert conn.client_id is not None
del connections[conn.client_id]
redis_client = get_redis_client()
def req_redis_key(req_id: str) -> str:
return 'socket_req_status:%s' % (req_id,)
class CloseErrorInfo:
def __init__(self, status_code: int, err_msg: str) -> None:
self.status_code = status_code
self.err_msg = err_msg
class SocketConnection(sockjs.tornado.SockJSConnection):
client_id = None # type: Optional[Union[int, str]]
def on_open(self, info: ConnectionInfo) -> None:
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))
record_request_start_data(log_data)
ioloop = tornado.ioloop.IOLoop.instance()
self.authenticated = False
self.session.user_profile = None
self.close_info = None # type: Optional[CloseErrorInfo]
self.did_close = False
try:
self.browser_session_id = info.get_cookie(settings.SESSION_COOKIE_NAME).value
self.csrf_token = info.get_cookie(settings.CSRF_COOKIE_NAME).value
except AttributeError:
# The request didn't contain the necessary cookie values. We can't
# close immediately because sockjs-tornado doesn't expect a close
# inside on_open(), so do it on the next tick.
self.close_info = CloseErrorInfo(403, "Initial cookie lacked required values")
ioloop.add_callback(self.close)
return
def auth_timeout() -> None:
self.close_info = CloseErrorInfo(408, "Timeout while waiting for authentication")
self.close()
self.timeout_handle = ioloop.call_later(10, auth_timeout)
write_log_line(log_data, path='/socket/open', method='SOCKET',
remote_ip=info.ip, email='unknown', client_name='?')
def authenticate_client(self, msg: Dict[str, Any]) -> None:
if self.authenticated:
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
'response': {'result': 'error',
'msg': 'Already authenticated'}})
return
user_profile = get_user_profile(self.browser_session_id)
if user_profile is None:
raise JsonableError(_('Unknown or missing session'))
self.session.user_profile = user_profile
if 'csrf_token' not in msg['request']:
# Debugging code to help with understanding #6961
logging.error("CSRF token missing from websockets auth request: %s" % (msg['request'],))
raise JsonableError(_('CSRF token entry missing from request'))
if not _compare_salted_tokens(msg['request']['csrf_token'], self.csrf_token):
raise JsonableError(_('CSRF token does not match that in cookie'))
if 'queue_id' not in msg['request']:
raise JsonableError(_("Missing 'queue_id' argument"))
queue_id = msg['request']['queue_id']
client = get_client_descriptor(queue_id)
if client is None:
raise BadEventQueueIdError(queue_id)
if user_profile.id != client.user_profile_id:
raise JsonableError(_("You are not the owner of the queue with id '%s'") % (queue_id,))
self.authenticated = True
register_connection(queue_id, self)
response = {'req_id': msg['req_id'], 'type': 'response',
'response': {'result': 'success', 'msg': ''}}
status_inquiries = msg['request'].get('status_inquiries')
if status_inquiries is not None:
results = {} # type: Dict[str, Dict[str, str]]
for inquiry in status_inquiries:
status = redis_client.hgetall(req_redis_key(inquiry)) # type: Dict[bytes, bytes]
if len(status) == 0:
result = {'status': 'not_received'}
elif b'response' not in status:
result = {'status': status[b'status'].decode('utf-8')}
else:
result = {'status': status[b'status'].decode('utf-8'),
'response': ujson.loads(status[b'response'])}
results[str(inquiry)] = result
response['response']['status_inquiries'] = results
self.session.send_message(response)
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.remove_timeout(self.timeout_handle)
def on_message(self, msg_raw: str) -> None:
log_data = dict(extra='[transport=%s' % (self.session.transport_name,))
record_request_start_data(log_data)
msg = ujson.loads(msg_raw)
if self.did_close:
user_email = 'unknown'
if self.session.user_profile is not None:
user_email = self.session.user_profile.delivery_email
logger.info("Received message on already closed socket! transport=%s user=%s client_id=%s"
% (self.session.transport_name,
user_email,
self.client_id))
self.session.send_message({'req_id': msg['req_id'], 'type': 'ack'})
if msg['type'] == 'auth':
log_data['extra'] += ']'
try:
self.authenticate_client(msg)
# TODO: Fill in the correct client
write_log_line(log_data, path='/socket/auth', method='SOCKET',
remote_ip=self.session.conn_info.ip,
email=self.session.user_profile.delivery_email,
client_name='?')
except JsonableError as e:
response = e.to_json()
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
'response': response})
write_log_line(log_data, path='/socket/auth', method='SOCKET',
remote_ip=self.session.conn_info.ip,
email='unknown', client_name='?',
status_code=403, error_content=ujson.dumps(response))
return
else:
if not self.authenticated:
response = {'result': 'error', 'msg': "Not yet authenticated"}
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
'response': response})
write_log_line(log_data, path='/socket/service_request', method='SOCKET',
remote_ip=self.session.conn_info.ip,
email='unknown', client_name='?',
status_code=403, error_content=ujson.dumps(response))
return
redis_key = req_redis_key(msg['req_id'])
with redis_client.pipeline() as pipeline:
pipeline.hmset(redis_key, {'status': 'received'})
pipeline.expire(redis_key, 60 * 60 * 24)
pipeline.execute()
record_request_stop_data(log_data)
request_environ = dict(REMOTE_ADDR=self.session.conn_info.ip)
queue_json_publish("message_sender",
dict(request=msg['request'],
req_id=msg['req_id'],
server_meta=dict(user_id=self.session.user_profile.id,
client_id=self.client_id,
return_queue=tornado_return_queue_name(self.port),
log_data=log_data,
request_environ=request_environ)))
def on_close(self) -> None:
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))
record_request_start_data(log_data)
if self.close_info is not None:
write_log_line(log_data, path='/socket/close', method='SOCKET',
remote_ip=self.session.conn_info.ip, email='unknown',
client_name='?', status_code=self.close_info.status_code,
error_content=self.close_info.err_msg)
else:
deregister_connection(self)
email = self.session.user_profile.delivery_email \
if self.session.user_profile is not None else 'unknown'
write_log_line(log_data, path='/socket/close', method='SOCKET',
remote_ip=self.session.conn_info.ip, email=email,
client_name='?')
self.did_close = True
def respond_send_message(data: Mapping[str, Any]) -> None:
log_data = data['server_meta']['log_data']
record_request_restart_data(log_data)
worker_log_data = data['server_meta']['worker_log_data']
forward_queue_delay = worker_log_data['time_started'] - log_data['time_stopped']
return_queue_delay = log_data['time_restarted'] - data['server_meta']['time_request_finished']
service_time = data['server_meta']['time_request_finished'] - worker_log_data['time_started']
log_data['extra'] += ', queue_delay: %s/%s, service_time: %s]' % (
format_timedelta(forward_queue_delay), format_timedelta(return_queue_delay),
format_timedelta(service_time))
client_id = data['server_meta']['client_id']
connection = get_connection(client_id)
if connection is None:
logger.info("Could not find connection to send response to! client_id=%s" % (client_id,))
else:
connection.session.send_message({'req_id': data['req_id'], 'type': 'response',
'response': data['response']})
# TODO: Fill in client name
# TODO: Maybe fill in the status code correctly
write_log_line(log_data, path='/socket/service_request', method='SOCKET',
remote_ip=connection.session.conn_info.ip,
email=connection.session.user_profile.delivery_email, client_name='?')
# We disable the eventsource and htmlfile transports because they cannot
# securely send us the zulip.com cookie, which we use as part of our
# authentication scheme.
sockjs_url = '%s/static/third/sockjs/sockjs-0.3.4.js' % (settings.ROOT_DOMAIN_URI,)
sockjs_router = sockjs.tornado.SockJSRouter(SocketConnection, "/sockjs",
{'sockjs_url': sockjs_url,
'disabled_transports': ['eventsource', 'htmlfile']})
def get_sockjs_router(port: int) -> sockjs.tornado.SockJSRouter:
sockjs_router._connection.port = port
return sockjs_router

View File

@@ -1,134 +0,0 @@
import logging
import requests
import ujson
from django.conf import settings
from django.contrib.auth import SESSION_KEY, BACKEND_SESSION_KEY, HASH_SESSION_KEY
from django.middleware.csrf import _get_new_csrf_token
from importlib import import_module
from tornado.ioloop import IOLoop
from tornado import gen
from tornado.httpclient import HTTPRequest
from tornado.websocket import websocket_connect, WebSocketClientConnection
from urllib.parse import urlparse, urljoin
from http.cookies import SimpleCookie
from zerver.models import get_system_bot
from typing import Any, Callable, Dict, Generator, Iterable, Optional
class WebsocketClient:
def __init__(self, host_url: str, sockjs_url: str, sender_email: str,
run_on_start: Callable[..., None], validate_ssl: bool=True,
**run_kwargs: Any) -> None:
# NOTE: Callable should take a WebsocketClient & kwargs, but this is not standardised
self.validate_ssl = validate_ssl
self.auth_email = sender_email
self.user_profile = get_system_bot(sender_email)
self.request_id_number = 0
self.parsed_host_url = urlparse(host_url)
self.sockjs_url = sockjs_url
self.cookie_dict = self._login()
self.cookie_str = self._get_cookie_header(self.cookie_dict)
self.events_data = self._get_queue_events(self.cookie_str)
self.ioloop_instance = IOLoop.instance()
self.run_on_start = run_on_start
self.run_kwargs = run_kwargs
self.scheme_dict = {'http': 'ws', 'https': 'wss'}
self.ws = None # type: Optional[WebSocketClientConnection]
def _login(self) -> Dict[str, str]:
# Ideally, we'd migrate this to use API auth instead of
# stealing cookies, but this works for now.
auth_backend = settings.AUTHENTICATION_BACKENDS[0]
session_auth_hash = self.user_profile.get_session_auth_hash()
engine = import_module(settings.SESSION_ENGINE)
session = engine.SessionStore() # type: ignore # import_module
session[SESSION_KEY] = self.user_profile._meta.pk.value_to_string(self.user_profile)
session[BACKEND_SESSION_KEY] = auth_backend
session[HASH_SESSION_KEY] = session_auth_hash
session.save()
return {
settings.SESSION_COOKIE_NAME: session.session_key,
settings.CSRF_COOKIE_NAME: _get_new_csrf_token()}
def _get_cookie_header(self, cookies: Dict[Any, Any]) -> str:
return ';'.join(
["{}={}".format(name, value) for name, value in cookies.items()])
@gen.coroutine
def _websocket_auth(self, queue_events_data: Dict[str, Dict[str, str]],
cookies: "SimpleCookie[str]") -> Generator[str, str, None]:
message = {
"req_id": self._get_request_id(),
"type": "auth",
"request": {
"csrf_token": cookies.get(settings.CSRF_COOKIE_NAME),
"queue_id": queue_events_data['queue_id'],
"status_inquiries": []
}
}
auth_frame_str = ujson.dumps(message)
self.ws.write_message(ujson.dumps([auth_frame_str]))
response_ack = yield self.ws.read_message()
response_message = yield self.ws.read_message()
raise gen.Return([response_ack, response_message])
def _get_queue_events(self, cookies_header: str) -> Dict[str, str]:
url = urljoin(self.parsed_host_url.geturl(), '/json/events?dont_block=true')
response = requests.get(url, headers={'Cookie': cookies_header}, verify=self.validate_ssl)
return response.json()
@gen.engine
def connect(self) -> Generator[str, WebSocketClientConnection, None]:
try:
request = HTTPRequest(url=self._get_websocket_url(), validate_cert=self.validate_ssl)
request.headers.add('Cookie', self.cookie_str)
self.ws = yield websocket_connect(request)
yield self.ws.read_message()
yield self._websocket_auth(self.events_data, self.cookie_dict)
self.run_on_start(self, **self.run_kwargs)
except Exception as e:
logging.exception(str(e))
IOLoop.instance().stop()
IOLoop.instance().stop()
@gen.coroutine
def send_message(self, client: str, type: str, subject: str, stream: str,
private_message_recepient: str,
content: str="") -> Generator[str, WebSocketClientConnection, None]:
user_message = {
"req_id": self._get_request_id(),
"type": "request",
"request": {
"client": client,
"type": type,
"subject": subject,
"stream": stream,
"private_message_recipient": private_message_recepient,
"content": content,
"sender_id": self.user_profile.id,
"queue_id": self.events_data['queue_id'],
"to": ujson.dumps([private_message_recepient]),
"reply_to": self.user_profile.email,
"local_id": -1
}
}
self.ws.write_message(ujson.dumps([ujson.dumps(user_message)]))
response_ack = yield self.ws.read_message()
response_message = yield self.ws.read_message()
raise gen.Return([response_ack, response_message])
def run(self) -> None:
self.ioloop_instance.add_callback(self.connect)
self.ioloop_instance.start()
def _get_websocket_url(self) -> str:
return '{}://{}{}'.format(self.scheme_dict[self.parsed_host_url.scheme],
self.parsed_host_url.netloc, self.sockjs_url)
def _get_request_id(self) -> Iterable[str]:
self.request_id_number += 1
return ':'.join((self.events_data['queue_id'], str(self.request_id_number)))

View File

@@ -187,7 +187,6 @@ def home_real(request: HttpRequest) -> HttpResponse:
max_file_upload_size = settings.MAX_FILE_UPLOAD_SIZE,
max_avatar_file_size = settings.MAX_AVATAR_FILE_SIZE,
server_generation = settings.SERVER_GENERATION,
use_websockets = settings.USE_WEBSOCKETS,
save_stacktraces = settings.SAVE_FRONTEND_STACKTRACES,
warn_no_email = settings.WARN_NO_EMAIL,
server_inline_image_preview = settings.INLINE_IMAGE_PREVIEW,

View File

@@ -13,8 +13,6 @@ import socket
from django.conf import settings
from django.db import connection
from django.core.handlers.wsgi import WSGIRequest
from django.core.handlers.base import BaseHandler
from zerver.models import \
get_client, get_system_bot, PreregistrationUser, \
get_user_profile_by_id, Message, Realm, UserMessage, UserProfile, \
@@ -22,7 +20,7 @@ from zerver.models import \
from zerver.lib.context_managers import lockfile
from zerver.lib.error_notify import do_report_error
from zerver.lib.feedback import handle_feedback
from zerver.lib.queue import SimpleQueueClient, queue_json_publish, retry_event
from zerver.lib.queue import SimpleQueueClient, retry_event
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.lib.email_notifications import handle_missedmessage_emails
from zerver.lib.push_notifications import handle_push_notification, handle_remove_push_notification, \
@@ -38,9 +36,7 @@ from zerver.lib.send_email import send_future_email, send_email_from_dict, \
from zerver.lib.email_mirror import process_message as mirror_email, rate_limit_mirror_by_realm, \
is_missed_message_address, decode_stream_email_address
from zerver.lib.streams import access_stream_by_id
from zerver.tornado.socket import req_redis_key, respond_send_message
from zerver.lib.db import reset_queries
from zerver.lib.redis_utils import get_redis_client
from zerver.context_processors import common_context
from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler
from zerver.models import get_bot_services, RealmAuditLog
@@ -51,7 +47,6 @@ from zerver.lib.export import export_realm_wrapper
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
import os
import sys
import ujson
from collections import defaultdict
import email
@@ -59,7 +54,6 @@ import time
import datetime
import logging
import requests
from io import StringIO
import urllib
logger = logging.getLogger(__name__)
@@ -484,66 +478,6 @@ class SlowQueryWorker(LoopQueueProcessingWorker):
internal_send_message(error_bot_realm, settings.ERROR_BOT,
"stream", settings.SLOW_QUERY_LOGS_STREAM, topic, content)
@assign_queue("message_sender")
class MessageSenderWorker(QueueProcessingWorker):
def __init__(self) -> None:
super().__init__()
self.redis_client = get_redis_client()
self.handler = BaseHandler()
self.handler.load_middleware()
def consume(self, event: Mapping[str, Any]) -> None:
server_meta = event['server_meta']
environ = {
'REQUEST_METHOD': 'SOCKET',
'SCRIPT_NAME': '',
'PATH_INFO': '/json/messages',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': 9993,
'SERVER_PROTOCOL': 'ZULIP_SOCKET/1.0',
'wsgi.version': (1, 0),
'wsgi.input': StringIO(),
'wsgi.errors': sys.stderr,
'wsgi.multithread': False,
'wsgi.multiprocess': True,
'wsgi.run_once': False,
'zulip.emulated_method': 'POST'
}
if 'socket_user_agent' in event['request']:
environ['HTTP_USER_AGENT'] = event['request']['socket_user_agent']
del event['request']['socket_user_agent']
# We're mostly using a WSGIRequest for convenience
environ.update(server_meta['request_environ'])
request = WSGIRequest(environ)
# Note: If we ever support non-POST methods, we'll need to change this.
request._post = event['request']
request.csrf_processing_done = True
user_profile = get_user_profile_by_id(server_meta['user_id'])
request._cached_user = user_profile
resp = self.handler.get_response(request)
server_meta['time_request_finished'] = time.time()
server_meta['worker_log_data'] = request._log_data
resp_content = resp.content.decode('utf-8')
response_data = ujson.loads(resp_content)
if response_data['result'] == 'error':
check_and_send_restart_signal()
result = {'response': response_data, 'req_id': event['req_id'],
'server_meta': server_meta}
redis_key = req_redis_key(event['req_id'])
self.redis_client.hmset(redis_key, {'status': 'complete',
'response': resp_content})
queue_json_publish(server_meta['return_queue'], result,
respond_send_message)
@assign_queue('digest_emails')
class DigestWorker(QueueProcessingWorker): # nocoverage
# Who gets a digest is entirely determined by the enqueue_digest_emails

View File

@@ -306,11 +306,6 @@ CONFIRMATION_LINK_DEFAULT_VALIDITY_DAYS = 1
INVITATION_LINK_VALIDITY_DAYS = 10
REALM_CREATION_LINK_VALIDITY_DAYS = 7
# By default, Zulip uses websockets to send messages. In some
# networks, websockets don't work. One can configure Zulip to
# not use websockets here.
USE_WEBSOCKETS = True
# Version number for ToS. Change this if you want to force every
# user to click through to re-accept terms of service before using
# Zulip again on the web.

View File

@@ -841,17 +841,6 @@ LOGGING = {
'requests': {
'level': 'WARNING',
},
'tornado.general': {
# sockjs.tornado sends a lot of ERROR level logs to this
# logger. These should not result in error emails/Zulips.
#
# TODO: Ideally, we'd do something that just filters the
# sockjs.tornado logging entirely, since other Tornado
# logging may be of interest. Might require patching
# sockjs.tornado to do this correctly :(.
'handlers': ['console', 'file'],
'propagate': False,
},
# our own loggers, alphabetized
'zerver.lib.digest': {