Extract bulk_add_subs_to_db_with_logging.

This is a trivial code extraction.
This commit is contained in:
Steve Howell
2020-10-12 20:57:20 +00:00
committed by Tim Abbott
parent 3ff9ce78ea
commit 84aa1389d8

View File

@@ -2856,42 +2856,13 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
subs_by_user[user_profile.id].append(sub_to_add) subs_by_user[user_profile.id].append(sub_to_add)
subs_to_add.append((sub_to_add, stream)) subs_to_add.append((sub_to_add, stream))
# TODO: XXX: This transaction really needs to be done at the serializeable new_occupied_streams = bulk_add_subs_to_db_with_logging(
# transaction isolation level. realm=realm,
with transaction.atomic():
occupied_streams_before = list(get_occupied_streams(realm))
Subscription.objects.bulk_create(sub for (sub, stream) in subs_to_add)
sub_ids = [sub.id for (sub, stream) in subs_to_activate]
Subscription.objects.filter(id__in=sub_ids).update(active=True)
occupied_streams_after = list(get_occupied_streams(realm))
# Log Subscription Activities in RealmAuditLog
event_time = timezone_now()
event_last_message_id = get_last_message_id()
all_subscription_logs: (List[RealmAuditLog]) = []
for (sub, stream) in subs_to_add:
all_subscription_logs.append(RealmAuditLog(realm=realm,
acting_user=acting_user, acting_user=acting_user,
modified_user=sub.user_profile, subs_to_add=subs_to_add,
modified_stream=stream, subs_to_activate=subs_to_activate,
event_last_message_id=event_last_message_id, )
event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
event_time=event_time))
for (sub, stream) in subs_to_activate:
all_subscription_logs.append(RealmAuditLog(realm=realm,
acting_user=acting_user,
modified_user=sub.user_profile,
modified_stream=stream,
event_last_message_id=event_last_message_id,
event_type=RealmAuditLog.SUBSCRIPTION_ACTIVATED,
event_time=event_time))
# Now since we have all log objects generated we can do a bulk insert
RealmAuditLog.objects.bulk_create(all_subscription_logs)
new_occupied_streams = [stream for stream in
set(occupied_streams_after) - set(occupied_streams_before)
if not stream.invite_only]
if new_occupied_streams and not from_stream_creation: if new_occupied_streams and not from_stream_creation:
event: Dict[str, object] = dict( event: Dict[str, object] = dict(
type="stream", type="stream",
@@ -2958,6 +2929,51 @@ def bulk_add_subscriptions(streams: Iterable[Stream],
[(sub.user_profile, stream) for (sub, stream) in subs_to_activate], [(sub.user_profile, stream) for (sub, stream) in subs_to_activate],
already_subscribed) already_subscribed)
def bulk_add_subs_to_db_with_logging(
realm: Realm,
acting_user: Optional[UserProfile],
subs_to_add: List[Tuple[Subscription, Stream]],
subs_to_activate: List[Tuple[Subscription, Stream]],
) -> List[Stream]:
# TODO: XXX: This transaction really needs to be done at the serializeable
# transaction isolation level.
with transaction.atomic():
occupied_streams_before = list(get_occupied_streams(realm))
Subscription.objects.bulk_create(sub for (sub, stream) in subs_to_add)
sub_ids = [sub.id for (sub, stream) in subs_to_activate]
Subscription.objects.filter(id__in=sub_ids).update(active=True)
occupied_streams_after = list(get_occupied_streams(realm))
# Log Subscription Activities in RealmAuditLog
event_time = timezone_now()
event_last_message_id = get_last_message_id()
all_subscription_logs: (List[RealmAuditLog]) = []
for (sub, stream) in subs_to_add:
all_subscription_logs.append(RealmAuditLog(realm=realm,
acting_user=acting_user,
modified_user=sub.user_profile,
modified_stream=stream,
event_last_message_id=event_last_message_id,
event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
event_time=event_time))
for (sub, stream) in subs_to_activate:
all_subscription_logs.append(RealmAuditLog(realm=realm,
acting_user=acting_user,
modified_user=sub.user_profile,
modified_stream=stream,
event_last_message_id=event_last_message_id,
event_type=RealmAuditLog.SUBSCRIPTION_ACTIVATED,
event_time=event_time))
# Now since we have all log objects generated we can do a bulk insert
RealmAuditLog.objects.bulk_create(all_subscription_logs)
new_occupied_streams = [stream for stream in
set(occupied_streams_after) - set(occupied_streams_before)
if not stream.invite_only]
return new_occupied_streams
def send_peer_add_events( def send_peer_add_events(
realm: Realm, realm: Realm,
users: List[UserProfile], users: List[UserProfile],