[puppet] Handle exceptions from queue workers.

Subclasses of QueueProcessingWorker that don't override start() will
have their consume() functions wrapped by consume_wrapper(), which
will catch exceptions and log data from troublesome events to a log
file.

We need to do a puppet apply to create /var/log/zulip/queue_error.

(imported from commit 3bd7751da5fdef449eeec3f7dd29977df11e2b9c)
This commit is contained in:
Steve Howell
2013-10-29 15:03:42 -04:00
parent 8e05f76511
commit 884e602185
4 changed files with 63 additions and 4 deletions

View File

@@ -115,6 +115,13 @@ class zulip::base {
mode => 640,
}
file { '/var/log/zulip/queue_error':
ensure => 'directory',
owner => 'zulip',
group => 'zulip',
mode => 640,
}
group { 'nagios':
ensure => present,
gid => '1050',

View File

@@ -386,6 +386,39 @@ class WorkerTest(TestCase):
self.assertTrue(len(activity_records), 1)
self.assertTrue(activity_records[0].count, 1)
def test_error_handling(self):
processed = []
@queue_processors.assign_queue('flake')
class FlakyWorker(queue_processors.QueueProcessingWorker):
def consume(self, ch, method, properties, data):
if data == 'freak out':
raise Exception('Freaking out!')
processed.append(data)
def _log_problem(self):
# keep the tests quiet
pass
fake_client = self.FakeClient()
for msg in ['good', 'fine', 'freak out', 'back to normal']:
fake_client.queue.append(('flake', msg))
fn = os.path.join(settings.QUEUE_ERROR_DIR, 'flake.errors')
try:
os.remove(fn)
except OSError:
pass
with simulated_queue_client(lambda: fake_client):
worker = FlakyWorker()
worker.start()
self.assertEqual(processed, ['good', 'fine', 'back to normal'])
line = open(fn).readline().strip()
event = ujson.loads(line.split('\t')[1])
self.assertEqual(event, 'freak out')
class ActivityTest(AuthedTestCase):
def test_activity(self):
self.login("hamlet@zulip.com")

View File

@@ -5,6 +5,7 @@ from django.conf import settings
from postmonkey import PostMonkey, MailChimpException
from zerver.models import UserActivityInterval, get_user_profile_by_email, \
get_user_profile_by_id, get_prereg_user_by_email, get_client
from zerver.lib.context_managers import lockfile
from zerver.lib.queue import SimpleQueueClient, queue_json_publish
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.lib.actions import handle_missedmessage_emails, do_send_confirmation_email, \
@@ -47,12 +48,27 @@ class QueueProcessingWorker(object):
def __init__(self):
self.q = SimpleQueueClient()
def consume_and_commit(self, *args):
with commit_on_success():
self.consume(*args)
def consume_wrapper(self, ch, method, properties, data):
try:
with commit_on_success():
self.consume(ch, method, properties, data)
except Exception:
self._log_problem()
if not os.path.exists(settings.QUEUE_ERROR_DIR):
os.mkdir(settings.QUEUE_ERROR_DIR)
fname = '%s.errors' % (self.queue_name,)
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
line = '%s\t%s\n' % (time.asctime(), ujson.dumps(data))
lock_fn = fn + '.lock'
with lockfile(lock_fn):
with open(fn, 'a') as f:
f.write(line)
def _log_problem(self):
logging.exception("Problem handling data on queue %s" % (self.queue_name,))
def start(self):
self.q.register_json_consumer(self.queue_name, self.consume_and_commit)
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
self.q.start_consuming()
def stop(self):

View File

@@ -511,6 +511,7 @@ if DEPLOYED:
STATS_DIR = '/home/zulip/stats'
PERSISTENT_QUEUE_FILENAME = "/home/zulip/tornado/event_queues.pickle"
EMAIL_LOG_PATH = "/var/log/zulip/email-mirror.log"
QUEUE_ERROR_DIR = '/var/log/zulip/queue_error'
else:
EVENT_LOG_DIR = 'event_log'
SERVER_LOG_PATH = "server.log"
@@ -518,6 +519,8 @@ else:
STATS_DIR = 'stats'
PERSISTENT_QUEUE_FILENAME = "event_queues.pickle"
EMAIL_LOG_PATH = "email-mirror.log"
QUEUE_ERROR_DIR = 'queue_error'
if len(sys.argv) > 2 and sys.argv[0].endswith('manage.py') and sys.argv[1] == 'process_queue':
FILE_LOG_PATH = WORKER_LOG_PATH