Remove transaction management hacks in workers.

(imported from commit 33b9349bd139907755b52474df8a0da13b73b80d)
This commit is contained in:
Tim Abbott
2013-11-01 14:02:11 -04:00
parent 2844a8fb97
commit 29aef33c53
2 changed files with 14 additions and 35 deletions

View File

@@ -196,7 +196,6 @@ def compute_mit_user_fullname(email):
@cache_with_key(lambda realm, email, f: user_profile_by_email_cache_key(email), @cache_with_key(lambda realm, email, f: user_profile_by_email_cache_key(email),
timeout=3600*24*7) timeout=3600*24*7)
@transaction.commit_on_success
def create_mirror_user_if_needed(realm, email, email_to_fullname): def create_mirror_user_if_needed(realm, email, email_to_fullname):
try: try:
return get_user_profile_by_email(email) return get_user_profile_by_email(email)
@@ -207,9 +206,6 @@ def create_mirror_user_if_needed(realm, email, email_to_fullname):
email_to_fullname(email), email_to_username(email), email_to_fullname(email), email_to_username(email),
active=False) active=False)
except IntegrityError: except IntegrityError:
# Unless we raced with another thread doing the same
# thing, in which case we should get the user they made
transaction.commit()
return get_user_profile_by_email(email) return get_user_profile_by_email(email)
def log_message(message): def log_message(message):
@@ -1249,19 +1245,13 @@ def do_update_user_activity_interval(user_profile, log_time):
end=effective_end) end=effective_end)
@statsd_increment('user_activity') @statsd_increment('user_activity')
@transaction.commit_on_success
def do_update_user_activity(user_profile, client, query, log_time): def do_update_user_activity(user_profile, client, query, log_time):
try: (activity, created) = UserActivity.objects.get_or_create(
(activity, created) = UserActivity.objects.get_or_create( user_profile = user_profile,
user_profile = user_profile, client = client,
client = client, query = query,
query = query, defaults={'last_visit': log_time, 'count': 0})
defaults={'last_visit': log_time, 'count': 0})
except IntegrityError:
transaction.commit()
activity = UserActivity.objects.get(user_profile = user_profile,
client = client,
query = query)
activity.count += 1 activity.count += 1
activity.last_visit = log_time activity.last_visit = log_time
activity.save(update_fields=["last_visit", "count"]) activity.save(update_fields=["last_visit", "count"])
@@ -1275,19 +1265,12 @@ def send_presence_changed(user_profile, presence):
tornado_callbacks.send_notification(notice) tornado_callbacks.send_notification(notice)
@statsd_increment('user_presence') @statsd_increment('user_presence')
@transaction.commit_on_success
def do_update_user_presence(user_profile, client, log_time, status): def do_update_user_presence(user_profile, client, log_time, status):
try: (presence, created) = UserPresence.objects.get_or_create(
(presence, created) = UserPresence.objects.get_or_create( user_profile = user_profile,
user_profile = user_profile, client = client,
client = client, defaults = {'timestamp': log_time,
defaults = {'timestamp': log_time, 'status': status})
'status': status})
except IntegrityError:
transaction.commit()
presence = UserPresence.objects.get(user_profile = user_profile,
client = client)
created = False
stale_status = (log_time - presence.timestamp) > datetime.timedelta(minutes=1, seconds=10) stale_status = (log_time - presence.timestamp) > datetime.timedelta(minutes=1, seconds=10)
was_idle = presence.status == UserPresence.IDLE was_idle = presence.status == UserPresence.IDLE

View File

@@ -1,6 +1,5 @@
from __future__ import absolute_import from __future__ import absolute_import
from django.db.transaction import commit_on_success
from django.conf import settings from django.conf import settings
from postmonkey import PostMonkey, MailChimpException from postmonkey import PostMonkey, MailChimpException
from zerver.models import UserActivityInterval, get_user_profile_by_email, \ from zerver.models import UserActivityInterval, get_user_profile_by_email, \
@@ -50,8 +49,7 @@ class QueueProcessingWorker(object):
def consume_wrapper(self, data): def consume_wrapper(self, data):
try: try:
with commit_on_success(): self.consume(data)
self.consume(data)
except Exception: except Exception:
self._log_problem() self._log_problem()
if not os.path.exists(settings.QUEUE_ERROR_DIR): if not os.path.exists(settings.QUEUE_ERROR_DIR):
@@ -180,8 +178,7 @@ class MissedMessageWorker(QueueProcessingWorker):
by_recipient[event['user_profile_id']].append(event) by_recipient[event['user_profile_id']].append(event)
for user_profile_id, events in by_recipient.items(): for user_profile_id, events in by_recipient.items():
with commit_on_success(): handle_missedmessage_emails(user_profile_id, events)
handle_missedmessage_emails(user_profile_id, events)
# Aggregate all messages received every 2 minutes to let someone finish sending a batch # Aggregate all messages received every 2 minutes to let someone finish sending a batch
# of messages # of messages
@@ -225,8 +222,7 @@ class SlowQueryWorker(QueueProcessingWorker):
for query in slow_queries: for query in slow_queries:
content += " %s\n" % (query,) content += " %s\n" % (query,)
with commit_on_success(): internal_send_message(settings.ERROR_BOT, "stream", "logs", topic, content)
internal_send_message(settings.ERROR_BOT, "stream", "logs", topic, content)
# Aggregate all slow query messages in 1-minute chunks to avoid message spam # Aggregate all slow query messages in 1-minute chunks to avoid message spam
time.sleep(1 * 60) time.sleep(1 * 60)