mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This is low priority, but it's nice to be consistently using the best practice pattern. Fixes: #12419.
		
			
				
	
	
		
			1704 lines
		
	
	
		
			64 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1704 lines
		
	
	
		
			64 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# This is the main code for the `./manage.py export` data export tool.
 | 
						|
# User docs: https://zulip.readthedocs.io/en/latest/production/export-and-import.html
 | 
						|
#
 | 
						|
# Most developers will interact with this primarily when they add a
 | 
						|
# new table to the schema, in which case they likely need to (1) add
 | 
						|
# it the lists in `ALL_ZULIP_TABLES` and similar data structures and
 | 
						|
# (2) if it doesn't belong in EXCLUDED_TABLES, add a Config object for
 | 
						|
# it to get_realm_config.
 | 
						|
import datetime
 | 
						|
from boto.s3.connection import S3Connection
 | 
						|
from boto.s3.key import Key  # for mypy
 | 
						|
from django.apps import apps
 | 
						|
from django.conf import settings
 | 
						|
from django.forms.models import model_to_dict
 | 
						|
from django.utils.timezone import make_aware as timezone_make_aware
 | 
						|
from django.utils.timezone import is_naive as timezone_is_naive
 | 
						|
import glob
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import ujson
 | 
						|
import subprocess
 | 
						|
import tempfile
 | 
						|
import shutil
 | 
						|
from scripts.lib.zulip_tools import overwrite_symlink
 | 
						|
from zerver.lib.avatar_hash import user_avatar_path_from_ids
 | 
						|
from analytics.models import RealmCount, UserCount, StreamCount
 | 
						|
from zerver.models import UserProfile, Realm, Client, Huddle, Stream, \
 | 
						|
    UserMessage, Subscription, Message, RealmEmoji, RealmFilter, Reaction, \
 | 
						|
    RealmDomain, Recipient, DefaultStream, get_user_profile_by_id, \
 | 
						|
    UserPresence, UserActivity, UserActivityInterval, CustomProfileField, \
 | 
						|
    CustomProfileFieldValue, get_display_recipient, Attachment, get_system_bot, \
 | 
						|
    RealmAuditLog, UserHotspot, MutedTopic, Service, UserGroup, \
 | 
						|
    UserGroupMembership, BotStorageData, BotConfigData
 | 
						|
from zerver.lib.parallel import run_parallel
 | 
						|
import zerver.lib.upload
 | 
						|
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, \
 | 
						|
    Union
 | 
						|
 | 
						|
# Custom mypy types follow:
 | 
						|
Record = Dict[str, Any]
 | 
						|
TableName = str
 | 
						|
TableData = Dict[TableName, List[Record]]
 | 
						|
Field = str
 | 
						|
Path = str
 | 
						|
Context = Dict[str, Any]
 | 
						|
FilterArgs = Dict[str, Any]
 | 
						|
IdSource = Tuple[TableName, Field]
 | 
						|
SourceFilter = Callable[[Record], bool]
 | 
						|
 | 
						|
# These next two types are callbacks, which mypy does not
 | 
						|
# support well, because PEP 484 says "using callbacks
 | 
						|
# with keyword arguments is not perceived as a common use case."
 | 
						|
# CustomFetch = Callable[[TableData, Config, Context], None]
 | 
						|
# PostProcessData = Callable[[TableData, Config, Context], None]
 | 
						|
CustomFetch = Any  # TODO: make more specific, see above
 | 
						|
PostProcessData = Any  # TODO: make more specific
 | 
						|
 | 
						|
# The keys of our MessageOutput variables are normally
 | 
						|
# List[Record], but when we write partials, we can get
 | 
						|
# lists of integers or a single integer.
 | 
						|
# TODO: This could maybe be improved using TypedDict?
 | 
						|
MessageOutput = Dict[str, Union[List[Record], List[int], int]]
 | 
						|
 | 
						|
MESSAGE_BATCH_CHUNK_SIZE = 1000
 | 
						|
 | 
						|
ALL_ZULIP_TABLES = {
 | 
						|
    'analytics_fillstate',
 | 
						|
    'analytics_installationcount',
 | 
						|
    'analytics_realmcount',
 | 
						|
    'analytics_streamcount',
 | 
						|
    'analytics_usercount',
 | 
						|
    'otp_static_staticdevice',
 | 
						|
    'otp_static_statictoken',
 | 
						|
    'otp_totp_totpdevice',
 | 
						|
    'social_auth_association',
 | 
						|
    'social_auth_code',
 | 
						|
    'social_auth_nonce',
 | 
						|
    'social_auth_partial',
 | 
						|
    'social_auth_usersocialauth',
 | 
						|
    'two_factor_phonedevice',
 | 
						|
    'zerver_archivedattachment',
 | 
						|
    'zerver_archivedattachment_messages',
 | 
						|
    'zerver_archivedmessage',
 | 
						|
    'zerver_archivedusermessage',
 | 
						|
    'zerver_attachment',
 | 
						|
    'zerver_attachment_messages',
 | 
						|
    'zerver_archivedreaction',
 | 
						|
    'zerver_archivedsubmessage',
 | 
						|
    'zerver_archivetransaction',
 | 
						|
    'zerver_botconfigdata',
 | 
						|
    'zerver_botstoragedata',
 | 
						|
    'zerver_client',
 | 
						|
    'zerver_customprofilefield',
 | 
						|
    'zerver_customprofilefieldvalue',
 | 
						|
    'zerver_defaultstream',
 | 
						|
    'zerver_defaultstreamgroup',
 | 
						|
    'zerver_defaultstreamgroup_streams',
 | 
						|
    'zerver_emailchangestatus',
 | 
						|
    'zerver_huddle',
 | 
						|
    'zerver_message',
 | 
						|
    'zerver_multiuseinvite',
 | 
						|
    'zerver_multiuseinvite_streams',
 | 
						|
    'zerver_preregistrationuser',
 | 
						|
    'zerver_preregistrationuser_streams',
 | 
						|
    'zerver_pushdevicetoken',
 | 
						|
    'zerver_reaction',
 | 
						|
    'zerver_realm',
 | 
						|
    'zerver_realmauditlog',
 | 
						|
    'zerver_realmdomain',
 | 
						|
    'zerver_realmemoji',
 | 
						|
    'zerver_realmfilter',
 | 
						|
    'zerver_recipient',
 | 
						|
    'zerver_scheduledemail',
 | 
						|
    'zerver_scheduledemail_users',
 | 
						|
    'zerver_scheduledmessage',
 | 
						|
    'zerver_service',
 | 
						|
    'zerver_stream',
 | 
						|
    'zerver_submessage',
 | 
						|
    'zerver_subscription',
 | 
						|
    'zerver_useractivity',
 | 
						|
    'zerver_useractivityinterval',
 | 
						|
    'zerver_usergroup',
 | 
						|
    'zerver_usergroupmembership',
 | 
						|
    'zerver_userhotspot',
 | 
						|
    'zerver_usermessage',
 | 
						|
    'zerver_userpresence',
 | 
						|
    'zerver_userprofile',
 | 
						|
    'zerver_userprofile_groups',
 | 
						|
    'zerver_userprofile_user_permissions',
 | 
						|
    'zerver_userstatus',
 | 
						|
    'zerver_mutedtopic',
 | 
						|
}
 | 
						|
 | 
						|
# This set contains those database tables that we expect to not be
 | 
						|
# included in the export.  This tool does validation to ensure that
 | 
						|
# every table in the database is either exported or listed here, to
 | 
						|
# ensure we never accidentally fail to export a table.
 | 
						|
NON_EXPORTED_TABLES = {
 | 
						|
    # These invitation/confirmation flow tables don't make sense to
 | 
						|
    # export, since invitations links will be broken by the server URL
 | 
						|
    # change anyway:
 | 
						|
    'zerver_emailchangestatus',
 | 
						|
    'zerver_multiuseinvite',
 | 
						|
    'zerver_multiuseinvite_streams',
 | 
						|
    'zerver_preregistrationuser',
 | 
						|
    'zerver_preregistrationuser_streams',
 | 
						|
 | 
						|
    # When switching servers, clients will need to re-login and
 | 
						|
    # reregister for push notifications anyway.
 | 
						|
    'zerver_pushdevicetoken',
 | 
						|
 | 
						|
    # We don't use these generated Django tables
 | 
						|
    'zerver_userprofile_groups',
 | 
						|
    'zerver_userprofile_user_permissions',
 | 
						|
 | 
						|
    # These is used for scheduling future activity; it could make
 | 
						|
    # sense to export, but is relatively low value.
 | 
						|
    'zerver_scheduledemail',
 | 
						|
    'zerver_scheduledemail_users',
 | 
						|
    'zerver_scheduledmessage',
 | 
						|
 | 
						|
    # These tables are related to a user's 2FA authentication
 | 
						|
    # configuration, which will need to be re-setup on the new server.
 | 
						|
    'two_factor_phonedevice',
 | 
						|
    'otp_static_staticdevice',
 | 
						|
    'otp_static_statictoken',
 | 
						|
    'otp_totp_totpdevice',
 | 
						|
 | 
						|
    # These archive tables should not be exported (they are to support
 | 
						|
    # restoring content accidentally deleted due to software bugs in
 | 
						|
    # the retention policy feature)
 | 
						|
    'zerver_archivedmessage',
 | 
						|
    'zerver_archivedusermessage',
 | 
						|
    'zerver_archivedattachment',
 | 
						|
    'zerver_archivedattachment_messages',
 | 
						|
    'zerver_archivedreaction',
 | 
						|
    'zerver_archivedsubmessage',
 | 
						|
    'zerver_archivetransaction',
 | 
						|
 | 
						|
    # Social auth tables are not needed post-export, since we don't
 | 
						|
    # use any of this state outside of a direct authentication flow.
 | 
						|
    'social_auth_association',
 | 
						|
    'social_auth_code',
 | 
						|
    'social_auth_nonce',
 | 
						|
    'social_auth_partial',
 | 
						|
    'social_auth_usersocialauth',
 | 
						|
 | 
						|
    # We will likely never want to migrate this table, since it's a
 | 
						|
    # total of all the realmcount values on the server.  Might need to
 | 
						|
    # recompute it after a fillstate import.
 | 
						|
    'analytics_installationcount',
 | 
						|
 | 
						|
    # Fillstate will require some cleverness to do the right partial export.
 | 
						|
    'analytics_fillstate',
 | 
						|
 | 
						|
    # These are for unfinished features; we'll want to add them ot the
 | 
						|
    # export before they reach full production status.
 | 
						|
    'zerver_defaultstreamgroup',
 | 
						|
    'zerver_defaultstreamgroup_streams',
 | 
						|
    'zerver_submessage',
 | 
						|
 | 
						|
    # This is low priority, since users can easily just reset themselves to away.
 | 
						|
    'zerver_userstatus',
 | 
						|
 | 
						|
    # For any tables listed below here, it's a bug that they are not present in the export.
 | 
						|
}
 | 
						|
 | 
						|
IMPLICIT_TABLES = {
 | 
						|
    # ManyToMany relationships are exported implicitly when importing
 | 
						|
    # the parent table.
 | 
						|
    'zerver_attachment_messages',
 | 
						|
}
 | 
						|
 | 
						|
ATTACHMENT_TABLES = {
 | 
						|
    'zerver_attachment',
 | 
						|
}
 | 
						|
 | 
						|
MESSAGE_TABLES = {
 | 
						|
    # message tables get special treatment, because they're by far our
 | 
						|
    # largest tables and need to be paginated.
 | 
						|
    'zerver_message',
 | 
						|
    'zerver_usermessage',
 | 
						|
    # zerver_reaction belongs here, since it's added late because it
 | 
						|
    # has a foreign key into the Message table.
 | 
						|
    'zerver_reaction',
 | 
						|
}
 | 
						|
 | 
						|
# These get their own file as analytics data can be quite large and
 | 
						|
# would otherwise make realm.json unpleasant to manually inspect
 | 
						|
ANALYTICS_TABLES = {
 | 
						|
    'analytics_realmcount',
 | 
						|
    'analytics_streamcount',
 | 
						|
    'analytics_usercount',
 | 
						|
}
 | 
						|
 | 
						|
# This data structure lists all the Django DateTimeField fields in the
 | 
						|
# data model.  These are converted to floats during the export process
 | 
						|
# via floatify_datetime_fields, and back during the import process.
 | 
						|
#
 | 
						|
# TODO: This data structure could likely eventually be replaced by
 | 
						|
# inspecting the corresponding Django models
 | 
						|
DATE_FIELDS = {
 | 
						|
    'zerver_attachment': ['create_time'],
 | 
						|
    'zerver_message': ['last_edit_time', 'pub_date'],
 | 
						|
    'zerver_realm': ['date_created'],
 | 
						|
    'zerver_stream': ['date_created'],
 | 
						|
    'zerver_useractivity': ['last_visit'],
 | 
						|
    'zerver_useractivityinterval': ['start', 'end'],
 | 
						|
    'zerver_userpresence': ['timestamp'],
 | 
						|
    'zerver_userprofile': ['date_joined', 'last_login', 'last_reminder'],
 | 
						|
    'zerver_realmauditlog': ['event_time'],
 | 
						|
    'zerver_userhotspot': ['timestamp'],
 | 
						|
    'analytics_installationcount': ['end_time'],
 | 
						|
    'analytics_realmcount': ['end_time'],
 | 
						|
    'analytics_usercount': ['end_time'],
 | 
						|
    'analytics_streamcount': ['end_time'],
 | 
						|
}  # type: Dict[TableName, List[Field]]
 | 
						|
 | 
						|
def sanity_check_output(data: TableData) -> None:
 | 
						|
    # First, we verify that the export tool has a declared
 | 
						|
    # configuration for every table declared in the `models.py` files.
 | 
						|
    target_models = (
 | 
						|
        list(apps.get_app_config('analytics').get_models(include_auto_created=True)) +
 | 
						|
        list(apps.get_app_config('django_otp').get_models(include_auto_created=True)) +
 | 
						|
        list(apps.get_app_config('otp_static').get_models(include_auto_created=True)) +
 | 
						|
        list(apps.get_app_config('otp_totp').get_models(include_auto_created=True)) +
 | 
						|
        list(apps.get_app_config('social_django').get_models(include_auto_created=True)) +
 | 
						|
        list(apps.get_app_config('two_factor').get_models(include_auto_created=True)) +
 | 
						|
        list(apps.get_app_config('zerver').get_models(include_auto_created=True))
 | 
						|
    )
 | 
						|
    all_tables_db = set(model._meta.db_table for model in target_models)
 | 
						|
 | 
						|
    # These assertion statements will fire when we add a new database
 | 
						|
    # table that is not included in Zulip's data exports.  Generally,
 | 
						|
    # you can add your new table to `ALL_ZULIP_TABLES` and
 | 
						|
    # `NON_EXPORTED_TABLES` during early work on a new feature so that
 | 
						|
    # CI passes.
 | 
						|
    #
 | 
						|
    # We'll want to make sure we handle it for exports before
 | 
						|
    # releasing the new feature, but doing so correctly requires some
 | 
						|
    # expertise on this export system.
 | 
						|
    assert ALL_ZULIP_TABLES == all_tables_db
 | 
						|
    assert NON_EXPORTED_TABLES.issubset(ALL_ZULIP_TABLES)
 | 
						|
    assert IMPLICIT_TABLES.issubset(ALL_ZULIP_TABLES)
 | 
						|
    assert ATTACHMENT_TABLES.issubset(ALL_ZULIP_TABLES)
 | 
						|
    assert ANALYTICS_TABLES.issubset(ALL_ZULIP_TABLES)
 | 
						|
 | 
						|
    tables = set(ALL_ZULIP_TABLES)
 | 
						|
    tables -= NON_EXPORTED_TABLES
 | 
						|
    tables -= IMPLICIT_TABLES
 | 
						|
    tables -= MESSAGE_TABLES
 | 
						|
    tables -= ATTACHMENT_TABLES
 | 
						|
    tables -= ANALYTICS_TABLES
 | 
						|
 | 
						|
    for table in tables:
 | 
						|
        if table not in data:
 | 
						|
            logging.warning('??? NO DATA EXPORTED FOR TABLE %s!!!' % (table,))
 | 
						|
 | 
						|
def write_data_to_file(output_file: Path, data: Any) -> None:
 | 
						|
    with open(output_file, "w") as f:
 | 
						|
        f.write(ujson.dumps(data, indent=4))
 | 
						|
 | 
						|
def make_raw(query: Any, exclude: Optional[List[Field]]=None) -> List[Record]:
 | 
						|
    '''
 | 
						|
    Takes a Django query and returns a JSONable list
 | 
						|
    of dictionaries corresponding to the database rows.
 | 
						|
    '''
 | 
						|
    rows = []
 | 
						|
    for instance in query:
 | 
						|
        data = model_to_dict(instance, exclude=exclude)
 | 
						|
        """
 | 
						|
        In Django 1.11.5, model_to_dict evaluates the QuerySet of
 | 
						|
        many-to-many field to give us a list of instances. We require
 | 
						|
        a list of primary keys, so we get the primary keys from the
 | 
						|
        instances below.
 | 
						|
        """
 | 
						|
        for field in instance._meta.many_to_many:
 | 
						|
            value = data[field.name]
 | 
						|
            data[field.name] = [row.id for row in value]
 | 
						|
 | 
						|
        rows.append(data)
 | 
						|
 | 
						|
    return rows
 | 
						|
 | 
						|
def floatify_datetime_fields(data: TableData, table: TableName) -> None:
 | 
						|
    for item in data[table]:
 | 
						|
        for field in DATE_FIELDS[table]:
 | 
						|
            orig_dt = item[field]
 | 
						|
            if orig_dt is None:
 | 
						|
                continue
 | 
						|
            if timezone_is_naive(orig_dt):
 | 
						|
                logging.warning("Naive datetime:", item)
 | 
						|
                dt = timezone_make_aware(orig_dt)
 | 
						|
            else:
 | 
						|
                dt = orig_dt
 | 
						|
            utc_naive  = dt.replace(tzinfo=None) - dt.utcoffset()
 | 
						|
            item[field] = (utc_naive - datetime.datetime(1970, 1, 1)).total_seconds()
 | 
						|
 | 
						|
class Config:
 | 
						|
    '''A Config object configures a single table for exporting (and, maybe
 | 
						|
    some day importing as well.  This configuration defines what
 | 
						|
    process needs to be followed to correctly extract the set of
 | 
						|
    objects to export.
 | 
						|
 | 
						|
    You should never mutate Config objects as part of the export;
 | 
						|
    instead use the data to determine how you populate other
 | 
						|
    data structures.
 | 
						|
 | 
						|
    There are parent/children relationships between Config objects.
 | 
						|
    The parent should be instantiated first.  The child will
 | 
						|
    append itself to the parent's list of children.
 | 
						|
 | 
						|
    '''
 | 
						|
 | 
						|
    def __init__(self, table: Optional[str]=None,
 | 
						|
                 model: Optional[Any]=None,
 | 
						|
                 normal_parent: Optional['Config']=None,
 | 
						|
                 virtual_parent: Optional['Config']=None,
 | 
						|
                 filter_args: Optional[FilterArgs]=None,
 | 
						|
                 custom_fetch: Optional[CustomFetch]=None,
 | 
						|
                 custom_tables: Optional[List[TableName]]=None,
 | 
						|
                 post_process_data: Optional[PostProcessData]=None,
 | 
						|
                 concat_and_destroy: Optional[List[TableName]]=None,
 | 
						|
                 id_source: Optional[IdSource]=None,
 | 
						|
                 source_filter: Optional[SourceFilter]=None,
 | 
						|
                 parent_key: Optional[Field]=None,
 | 
						|
                 use_all: bool=False,
 | 
						|
                 is_seeded: bool=False,
 | 
						|
                 exclude: Optional[List[Field]]=None) -> None:
 | 
						|
        assert table or custom_tables
 | 
						|
        self.table = table
 | 
						|
        self.model = model
 | 
						|
        self.normal_parent = normal_parent
 | 
						|
        self.virtual_parent = virtual_parent
 | 
						|
        self.filter_args = filter_args
 | 
						|
        self.parent_key = parent_key
 | 
						|
        self.use_all = use_all
 | 
						|
        self.is_seeded = is_seeded
 | 
						|
        self.exclude = exclude
 | 
						|
        self.custom_fetch = custom_fetch
 | 
						|
        self.custom_tables = custom_tables
 | 
						|
        self.post_process_data = post_process_data
 | 
						|
        self.concat_and_destroy = concat_and_destroy
 | 
						|
        self.id_source = id_source
 | 
						|
        self.source_filter = source_filter
 | 
						|
        self.children = []  # type: List[Config]
 | 
						|
 | 
						|
        if normal_parent is not None:
 | 
						|
            self.parent = normal_parent  # type: Optional[Config]
 | 
						|
        else:
 | 
						|
            self.parent = None
 | 
						|
 | 
						|
        if virtual_parent is not None and normal_parent is not None:
 | 
						|
            raise AssertionError('''
 | 
						|
                If you specify a normal_parent, please
 | 
						|
                do not create a virtual_parent.
 | 
						|
                ''')
 | 
						|
 | 
						|
        if normal_parent is not None:
 | 
						|
            normal_parent.children.append(self)
 | 
						|
        elif virtual_parent is not None:
 | 
						|
            virtual_parent.children.append(self)
 | 
						|
        elif is_seeded is None:
 | 
						|
            raise AssertionError('''
 | 
						|
                You must specify a parent if you are
 | 
						|
                not using is_seeded.
 | 
						|
                ''')
 | 
						|
 | 
						|
        if self.id_source is not None:
 | 
						|
            if self.virtual_parent is None:
 | 
						|
                raise AssertionError('''
 | 
						|
                    You must specify a virtual_parent if you are
 | 
						|
                    using id_source.''')
 | 
						|
            if self.id_source[0] != self.virtual_parent.table:
 | 
						|
                raise AssertionError('''
 | 
						|
                    Configuration error.  To populate %s, you
 | 
						|
                    want data from %s, but that differs from
 | 
						|
                    the table name of your virtual parent (%s),
 | 
						|
                    which suggests you many not have set up
 | 
						|
                    the ordering correctly.  You may simply
 | 
						|
                    need to assign a virtual_parent, or there
 | 
						|
                    may be deeper issues going on.''' % (
 | 
						|
                    self.table,
 | 
						|
                    self.id_source[0],
 | 
						|
                    self.virtual_parent.table))
 | 
						|
 | 
						|
 | 
						|
def export_from_config(response: TableData, config: Config, seed_object: Optional[Any]=None,
 | 
						|
                       context: Optional[Context]=None) -> None:
 | 
						|
    table = config.table
 | 
						|
    parent = config.parent
 | 
						|
    model = config.model
 | 
						|
 | 
						|
    if context is None:
 | 
						|
        context = {}
 | 
						|
 | 
						|
    if table:
 | 
						|
        exported_tables = [table]
 | 
						|
    else:
 | 
						|
        if config.custom_tables is None:
 | 
						|
            raise AssertionError('''
 | 
						|
                You must specify config.custom_tables if you
 | 
						|
                are not specifying config.table''')
 | 
						|
        exported_tables = config.custom_tables
 | 
						|
 | 
						|
    for t in exported_tables:
 | 
						|
        logging.info('Exporting via export_from_config:  %s' % (t,))
 | 
						|
 | 
						|
    rows = None
 | 
						|
    if config.is_seeded:
 | 
						|
        rows = [seed_object]
 | 
						|
 | 
						|
    elif config.custom_fetch:
 | 
						|
        config.custom_fetch(
 | 
						|
            response=response,
 | 
						|
            config=config,
 | 
						|
            context=context
 | 
						|
        )
 | 
						|
        if config.custom_tables:
 | 
						|
            for t in config.custom_tables:
 | 
						|
                if t not in response:
 | 
						|
                    raise AssertionError('Custom fetch failed to populate %s' % (t,))
 | 
						|
 | 
						|
    elif config.concat_and_destroy:
 | 
						|
        # When we concat_and_destroy, we are working with
 | 
						|
        # temporary "tables" that are lists of records that
 | 
						|
        # should already be ready to export.
 | 
						|
        data = []  # type: List[Record]
 | 
						|
        for t in config.concat_and_destroy:
 | 
						|
            data += response[t]
 | 
						|
            del response[t]
 | 
						|
            logging.info('Deleted temporary %s' % (t,))
 | 
						|
        assert table is not None
 | 
						|
        response[table] = data
 | 
						|
 | 
						|
    elif config.use_all:
 | 
						|
        assert model is not None
 | 
						|
        query = model.objects.all()
 | 
						|
        rows = list(query)
 | 
						|
 | 
						|
    elif config.normal_parent:
 | 
						|
        # In this mode, our current model is figuratively Article,
 | 
						|
        # and normal_parent is figuratively Blog, and
 | 
						|
        # now we just need to get all the articles
 | 
						|
        # contained by the blogs.
 | 
						|
        model = config.model
 | 
						|
        assert parent is not None
 | 
						|
        assert parent.table is not None
 | 
						|
        assert config.parent_key is not None
 | 
						|
        parent_ids = [r['id'] for r in response[parent.table]]
 | 
						|
        filter_parms = {config.parent_key: parent_ids}  # type: Dict[str, Any]
 | 
						|
        if config.filter_args is not None:
 | 
						|
            filter_parms.update(config.filter_args)
 | 
						|
        assert model is not None
 | 
						|
        query = model.objects.filter(**filter_parms)
 | 
						|
        rows = list(query)
 | 
						|
 | 
						|
    elif config.id_source:
 | 
						|
        # In this mode, we are the figurative Blog, and we now
 | 
						|
        # need to look at the current response to get all the
 | 
						|
        # blog ids from the Article rows we fetched previously.
 | 
						|
        model = config.model
 | 
						|
        assert model is not None
 | 
						|
        # This will be a tuple of the form ('zerver_article', 'blog').
 | 
						|
        (child_table, field) = config.id_source
 | 
						|
        child_rows = response[child_table]
 | 
						|
        if config.source_filter:
 | 
						|
            child_rows = [r for r in child_rows if config.source_filter(r)]
 | 
						|
        lookup_ids = [r[field] for r in child_rows]
 | 
						|
        filter_parms = dict(id__in=lookup_ids)
 | 
						|
        if config.filter_args:
 | 
						|
            filter_parms.update(config.filter_args)
 | 
						|
        query = model.objects.filter(**filter_parms)
 | 
						|
        rows = list(query)
 | 
						|
 | 
						|
    # Post-process rows (which won't apply to custom fetches/concats)
 | 
						|
    if rows is not None:
 | 
						|
        assert table is not None  # Hint for mypy
 | 
						|
        response[table] = make_raw(rows, exclude=config.exclude)
 | 
						|
        if table in DATE_FIELDS:
 | 
						|
            floatify_datetime_fields(response, table)
 | 
						|
 | 
						|
    if config.post_process_data:
 | 
						|
        config.post_process_data(
 | 
						|
            response=response,
 | 
						|
            config=config,
 | 
						|
            context=context
 | 
						|
        )
 | 
						|
 | 
						|
    # Now walk our children.  It's extremely important to respect
 | 
						|
    # the order of children here.
 | 
						|
    for child_config in config.children:
 | 
						|
        export_from_config(
 | 
						|
            response=response,
 | 
						|
            config=child_config,
 | 
						|
            context=context,
 | 
						|
        )
 | 
						|
 | 
						|
def get_realm_config() -> Config:
 | 
						|
    # This function generates the main Config object that defines how
 | 
						|
    # to do a full-realm export of a single realm from a Zulip server.
 | 
						|
 | 
						|
    realm_config = Config(
 | 
						|
        table='zerver_realm',
 | 
						|
        is_seeded=True
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_defaultstream',
 | 
						|
        model=DefaultStream,
 | 
						|
        normal_parent=realm_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_customprofilefield',
 | 
						|
        model=CustomProfileField,
 | 
						|
        normal_parent=realm_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_realmemoji',
 | 
						|
        model=RealmEmoji,
 | 
						|
        normal_parent=realm_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_realmdomain',
 | 
						|
        model=RealmDomain,
 | 
						|
        normal_parent=realm_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_realmfilter',
 | 
						|
        model=RealmFilter,
 | 
						|
        normal_parent=realm_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_client',
 | 
						|
        model=Client,
 | 
						|
        virtual_parent=realm_config,
 | 
						|
        use_all=True
 | 
						|
    )
 | 
						|
 | 
						|
    user_profile_config = Config(
 | 
						|
        custom_tables=[
 | 
						|
            'zerver_userprofile',
 | 
						|
            'zerver_userprofile_mirrordummy',
 | 
						|
        ],
 | 
						|
        # set table for children who treat us as normal parent
 | 
						|
        table='zerver_userprofile',
 | 
						|
        virtual_parent=realm_config,
 | 
						|
        custom_fetch=fetch_user_profile,
 | 
						|
    )
 | 
						|
 | 
						|
    user_groups_config = Config(
 | 
						|
        table='zerver_usergroup',
 | 
						|
        model=UserGroup,
 | 
						|
        normal_parent=realm_config,
 | 
						|
        parent_key='realm__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_usergroupmembership',
 | 
						|
        model=UserGroupMembership,
 | 
						|
        normal_parent=user_groups_config,
 | 
						|
        parent_key='user_group__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        custom_tables=[
 | 
						|
            'zerver_userprofile_crossrealm',
 | 
						|
        ],
 | 
						|
        virtual_parent=user_profile_config,
 | 
						|
        custom_fetch=fetch_user_profile_cross_realm,
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_userpresence',
 | 
						|
        model=UserPresence,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_customprofilefieldvalue',
 | 
						|
        model=CustomProfileFieldValue,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_useractivity',
 | 
						|
        model=UserActivity,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_useractivityinterval',
 | 
						|
        model=UserActivityInterval,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_realmauditlog',
 | 
						|
        model=RealmAuditLog,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='modified_user__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_userhotspot',
 | 
						|
        model=UserHotspot,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_mutedtopic',
 | 
						|
        model=MutedTopic,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_service',
 | 
						|
        model=Service,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_botstoragedata',
 | 
						|
        model=BotStorageData,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='bot_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_botconfigdata',
 | 
						|
        model=BotConfigData,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='bot_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    # Some of these tables are intermediate "tables" that we
 | 
						|
    # create only for the export.  Think of them as similar to views.
 | 
						|
 | 
						|
    user_subscription_config = Config(
 | 
						|
        table='_user_subscription',
 | 
						|
        model=Subscription,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        filter_args={'recipient__type': Recipient.PERSONAL},
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='_user_recipient',
 | 
						|
        model=Recipient,
 | 
						|
        virtual_parent=user_subscription_config,
 | 
						|
        id_source=('_user_subscription', 'recipient'),
 | 
						|
    )
 | 
						|
 | 
						|
    #
 | 
						|
    stream_subscription_config = Config(
 | 
						|
        table='_stream_subscription',
 | 
						|
        model=Subscription,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        filter_args={'recipient__type': Recipient.STREAM},
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    stream_recipient_config = Config(
 | 
						|
        table='_stream_recipient',
 | 
						|
        model=Recipient,
 | 
						|
        virtual_parent=stream_subscription_config,
 | 
						|
        id_source=('_stream_subscription', 'recipient'),
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_stream',
 | 
						|
        model=Stream,
 | 
						|
        virtual_parent=stream_recipient_config,
 | 
						|
        id_source=('_stream_recipient', 'type_id'),
 | 
						|
        source_filter=lambda r: r['type'] == Recipient.STREAM,
 | 
						|
        exclude=['email_token'],
 | 
						|
        post_process_data=sanity_check_stream_data
 | 
						|
    )
 | 
						|
 | 
						|
    #
 | 
						|
 | 
						|
    Config(
 | 
						|
        custom_tables=[
 | 
						|
            '_huddle_recipient',
 | 
						|
            '_huddle_subscription',
 | 
						|
            'zerver_huddle',
 | 
						|
        ],
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        custom_fetch=fetch_huddle_objects,
 | 
						|
    )
 | 
						|
 | 
						|
    # Now build permanent tables from our temp tables.
 | 
						|
    Config(
 | 
						|
        table='zerver_recipient',
 | 
						|
        virtual_parent=user_profile_config,
 | 
						|
        concat_and_destroy=[
 | 
						|
            '_user_recipient',
 | 
						|
            '_stream_recipient',
 | 
						|
            '_huddle_recipient',
 | 
						|
        ],
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='zerver_subscription',
 | 
						|
        virtual_parent=user_profile_config,
 | 
						|
        concat_and_destroy=[
 | 
						|
            '_user_subscription',
 | 
						|
            '_stream_subscription',
 | 
						|
            '_huddle_subscription',
 | 
						|
        ]
 | 
						|
    )
 | 
						|
 | 
						|
    return realm_config
 | 
						|
 | 
						|
def sanity_check_stream_data(response: TableData, config: Config, context: Context) -> None:
 | 
						|
 | 
						|
    if context['exportable_user_ids'] is not None:
 | 
						|
        # If we restrict which user ids are exportable,
 | 
						|
        # the way that we find # streams is a little too
 | 
						|
        # complex to have a sanity check.
 | 
						|
        return
 | 
						|
 | 
						|
    actual_streams = set([stream.name for stream in Stream.objects.filter(
 | 
						|
        realm=response["zerver_realm"][0]['id'])])
 | 
						|
    streams_in_response = set([stream['name'] for stream in response['zerver_stream']])
 | 
						|
 | 
						|
    if len(streams_in_response - actual_streams) > 0:
 | 
						|
        print("Error: Streams not present in the realm were exported:")
 | 
						|
        print("   ", streams_in_response - actual_streams)
 | 
						|
        print("This is likely due to a bug in the export tool.")
 | 
						|
        raise AssertionError("Aborting!  Please investigate.")
 | 
						|
    if len(actual_streams - streams_in_response) > 0:
 | 
						|
        print("Error: Some streams present in the realm were not exported:")
 | 
						|
        print("    ", actual_streams - streams_in_response)
 | 
						|
        print("Usually, this is caused by a stream having been created that never had subscribers.")
 | 
						|
        print("(Due to a bug elsewhere in Zulip, not in the export tool)")
 | 
						|
        raise AssertionError("Aborting!  Please investigate.")
 | 
						|
 | 
						|
def fetch_user_profile(response: TableData, config: Config, context: Context) -> None:
 | 
						|
    realm = context['realm']
 | 
						|
    exportable_user_ids = context['exportable_user_ids']
 | 
						|
 | 
						|
    query = UserProfile.objects.filter(realm_id=realm.id)
 | 
						|
    exclude = ['password', 'api_key']
 | 
						|
    rows = make_raw(list(query), exclude=exclude)
 | 
						|
 | 
						|
    normal_rows = []  # type: List[Record]
 | 
						|
    dummy_rows = []  # type: List[Record]
 | 
						|
 | 
						|
    for row in rows:
 | 
						|
        if exportable_user_ids is not None:
 | 
						|
            if row['id'] in exportable_user_ids:
 | 
						|
                assert not row['is_mirror_dummy']
 | 
						|
            else:
 | 
						|
                # Convert non-exportable users to
 | 
						|
                # inactive is_mirror_dummy users.
 | 
						|
                row['is_mirror_dummy'] = True
 | 
						|
                row['is_active'] = False
 | 
						|
 | 
						|
        if row['is_mirror_dummy']:
 | 
						|
            dummy_rows.append(row)
 | 
						|
        else:
 | 
						|
            normal_rows.append(row)
 | 
						|
 | 
						|
    response['zerver_userprofile'] = normal_rows
 | 
						|
    response['zerver_userprofile_mirrordummy'] = dummy_rows
 | 
						|
 | 
						|
def fetch_user_profile_cross_realm(response: TableData, config: Config, context: Context) -> None:
 | 
						|
    realm = context['realm']
 | 
						|
    response['zerver_userprofile_crossrealm'] = []
 | 
						|
 | 
						|
    if realm.string_id == settings.SYSTEM_BOT_REALM:
 | 
						|
        return
 | 
						|
 | 
						|
    for bot_user in [
 | 
						|
            get_system_bot(settings.NOTIFICATION_BOT),
 | 
						|
            get_system_bot(settings.EMAIL_GATEWAY_BOT),
 | 
						|
            get_system_bot(settings.WELCOME_BOT),
 | 
						|
    ]:
 | 
						|
        recipient_id = Recipient.objects.get(type_id=bot_user.id, type=Recipient.PERSONAL).id
 | 
						|
        response['zerver_userprofile_crossrealm'].append(dict(
 | 
						|
            email=bot_user.email,
 | 
						|
            id=bot_user.id,
 | 
						|
            recipient_id=recipient_id,
 | 
						|
        ))
 | 
						|
 | 
						|
def fetch_attachment_data(response: TableData, realm_id: int, message_ids: Set[int]) -> None:
 | 
						|
    filter_args = {'realm_id': realm_id}
 | 
						|
    query = Attachment.objects.filter(**filter_args)
 | 
						|
    response['zerver_attachment'] = make_raw(list(query))
 | 
						|
    floatify_datetime_fields(response, 'zerver_attachment')
 | 
						|
 | 
						|
    '''
 | 
						|
    We usually export most messages for the realm, but not
 | 
						|
    quite ALL messages for the realm.  So, we need to
 | 
						|
    clean up our attachment data to have correct
 | 
						|
    values for response['zerver_attachment'][<n>]['messages'].
 | 
						|
    '''
 | 
						|
    for row in response['zerver_attachment']:
 | 
						|
        filterer_message_ids = set(row['messages']).intersection(message_ids)
 | 
						|
        row['messages'] = sorted(list(filterer_message_ids))
 | 
						|
 | 
						|
    '''
 | 
						|
    Attachments can be connected to multiple messages, although
 | 
						|
    it's most common to have just one message. Regardless,
 | 
						|
    if none of those message(s) survived the filtering above
 | 
						|
    for a particular attachment, then we won't export the
 | 
						|
    attachment row.
 | 
						|
    '''
 | 
						|
    response['zerver_attachment'] = [
 | 
						|
        row for row in response['zerver_attachment']
 | 
						|
        if row['messages']]
 | 
						|
 | 
						|
def fetch_reaction_data(response: TableData, message_ids: Set[int]) -> None:
 | 
						|
    query = Reaction.objects.filter(message_id__in=list(message_ids))
 | 
						|
    response['zerver_reaction'] = make_raw(list(query))
 | 
						|
 | 
						|
def fetch_huddle_objects(response: TableData, config: Config, context: Context) -> None:
 | 
						|
 | 
						|
    realm = context['realm']
 | 
						|
    assert config.parent is not None
 | 
						|
    assert config.parent.table is not None
 | 
						|
    user_profile_ids = set(r['id'] for r in response[config.parent.table])
 | 
						|
 | 
						|
    # First we get all huddles involving someone in the realm.
 | 
						|
    realm_huddle_subs = Subscription.objects.select_related("recipient").filter(
 | 
						|
        recipient__type=Recipient.HUDDLE, user_profile__in=user_profile_ids)
 | 
						|
    realm_huddle_recipient_ids = set(sub.recipient_id for sub in realm_huddle_subs)
 | 
						|
 | 
						|
    # Mark all Huddles whose recipient ID contains a cross-realm user.
 | 
						|
    unsafe_huddle_recipient_ids = set()
 | 
						|
    for sub in Subscription.objects.select_related().filter(recipient__in=realm_huddle_recipient_ids):
 | 
						|
        if sub.user_profile.realm != realm:
 | 
						|
            # In almost every case the other realm will be zulip.com
 | 
						|
            unsafe_huddle_recipient_ids.add(sub.recipient_id)
 | 
						|
 | 
						|
    # Now filter down to just those huddles that are entirely within the realm.
 | 
						|
    #
 | 
						|
    # This is important for ensuring that the User objects needed
 | 
						|
    # to import it on the other end exist (since we're only
 | 
						|
    # exporting the users from this realm), at the cost of losing
 | 
						|
    # some of these cross-realm messages.
 | 
						|
    huddle_subs = [sub for sub in realm_huddle_subs if sub.recipient_id not in unsafe_huddle_recipient_ids]
 | 
						|
    huddle_recipient_ids = set(sub.recipient_id for sub in huddle_subs)
 | 
						|
    huddle_ids = set(sub.recipient.type_id for sub in huddle_subs)
 | 
						|
 | 
						|
    huddle_subscription_dicts = make_raw(huddle_subs)
 | 
						|
    huddle_recipients = make_raw(Recipient.objects.filter(id__in=huddle_recipient_ids))
 | 
						|
 | 
						|
    response['_huddle_recipient'] = huddle_recipients
 | 
						|
    response['_huddle_subscription'] = huddle_subscription_dicts
 | 
						|
    response['zerver_huddle'] = make_raw(Huddle.objects.filter(id__in=huddle_ids))
 | 
						|
 | 
						|
def fetch_usermessages(realm: Realm,
 | 
						|
                       message_ids: Set[int],
 | 
						|
                       user_profile_ids: Set[int],
 | 
						|
                       message_filename: Path,
 | 
						|
                       consent_message_id: Optional[int]=None) -> List[Record]:
 | 
						|
    # UserMessage export security rule: You can export UserMessages
 | 
						|
    # for the messages you exported for the users in your realm.
 | 
						|
    user_message_query = UserMessage.objects.filter(user_profile__realm=realm,
 | 
						|
                                                    message_id__in=message_ids)
 | 
						|
    if consent_message_id is not None:
 | 
						|
        consented_user_ids = get_consented_user_ids(consent_message_id)
 | 
						|
        user_profile_ids = user_profile_ids & consented_user_ids
 | 
						|
    user_message_chunk = []
 | 
						|
    for user_message in user_message_query:
 | 
						|
        if user_message.user_profile_id not in user_profile_ids:
 | 
						|
            continue
 | 
						|
        user_message_obj = model_to_dict(user_message)
 | 
						|
        user_message_obj['flags_mask'] = user_message.flags.mask
 | 
						|
        del user_message_obj['flags']
 | 
						|
        user_message_chunk.append(user_message_obj)
 | 
						|
    logging.info("Fetched UserMessages for %s" % (message_filename,))
 | 
						|
    return user_message_chunk
 | 
						|
 | 
						|
def export_usermessages_batch(input_path: Path, output_path: Path,
 | 
						|
                              consent_message_id: Optional[int]=None) -> None:
 | 
						|
    """As part of the system for doing parallel exports, this runs on one
 | 
						|
    batch of Message objects and adds the corresponding UserMessage
 | 
						|
    objects. (This is called by the export_usermessage_batch
 | 
						|
    management command)."""
 | 
						|
    with open(input_path, "r") as input_file:
 | 
						|
        output = ujson.loads(input_file.read())
 | 
						|
    message_ids = [item['id'] for item in output['zerver_message']]
 | 
						|
    user_profile_ids = set(output['zerver_userprofile_ids'])
 | 
						|
    del output['zerver_userprofile_ids']
 | 
						|
    realm = Realm.objects.get(id=output['realm_id'])
 | 
						|
    del output['realm_id']
 | 
						|
    output['zerver_usermessage'] = fetch_usermessages(realm, set(message_ids), user_profile_ids,
 | 
						|
                                                      output_path, consent_message_id)
 | 
						|
    write_message_export(output_path, output)
 | 
						|
    os.unlink(input_path)
 | 
						|
 | 
						|
def write_message_export(message_filename: Path, output: MessageOutput) -> None:
 | 
						|
    write_data_to_file(output_file=message_filename, data=output)
 | 
						|
    logging.info("Dumped to %s" % (message_filename,))
 | 
						|
 | 
						|
def export_partial_message_files(realm: Realm,
 | 
						|
                                 response: TableData,
 | 
						|
                                 chunk_size: int=MESSAGE_BATCH_CHUNK_SIZE,
 | 
						|
                                 output_dir: Optional[Path]=None,
 | 
						|
                                 public_only: bool=False,
 | 
						|
                                 consent_message_id: Optional[int]=None) -> Set[int]:
 | 
						|
    if output_dir is None:
 | 
						|
        output_dir = tempfile.mkdtemp(prefix="zulip-export")
 | 
						|
 | 
						|
    def get_ids(records: List[Record]) -> Set[int]:
 | 
						|
        return set(x['id'] for x in records)
 | 
						|
 | 
						|
    # Basic security rule: You can export everything either...
 | 
						|
    #   - sent by someone in your exportable_user_ids
 | 
						|
    #        OR
 | 
						|
    #   - received by someone in your exportable_user_ids (which
 | 
						|
    #     equates to a recipient object we are exporting)
 | 
						|
    #
 | 
						|
    # TODO: In theory, you should be able to export messages in
 | 
						|
    # cross-realm PM threads; currently, this only exports cross-realm
 | 
						|
    # messages received by your realm that were sent by Zulip system
 | 
						|
    # bots (e.g. emailgateway, notification-bot).
 | 
						|
 | 
						|
    # Here, "we" and "us" refers to the inner circle of users who
 | 
						|
    # were specified as being allowed to be exported.  "Them"
 | 
						|
    # refers to other users.
 | 
						|
    user_ids_for_us = get_ids(
 | 
						|
        response['zerver_userprofile']
 | 
						|
    )
 | 
						|
    ids_of_our_possible_senders = get_ids(
 | 
						|
        response['zerver_userprofile'] +
 | 
						|
        response['zerver_userprofile_mirrordummy'] +
 | 
						|
        response['zerver_userprofile_crossrealm'])
 | 
						|
 | 
						|
    consented_user_ids = set()  # type: Set[int]
 | 
						|
    if consent_message_id is not None:
 | 
						|
        consented_user_ids = get_consented_user_ids(consent_message_id)
 | 
						|
 | 
						|
    if public_only:
 | 
						|
        recipient_streams = Stream.objects.filter(realm=realm, invite_only=False)
 | 
						|
        recipient_ids = Recipient.objects.filter(
 | 
						|
            type=Recipient.STREAM, type_id__in=recipient_streams).values_list("id", flat=True)
 | 
						|
        recipient_ids_for_us = get_ids(response['zerver_recipient']) & set(recipient_ids)
 | 
						|
    elif consent_message_id is not None:
 | 
						|
        public_streams = Stream.objects.filter(realm=realm, invite_only=False)
 | 
						|
        public_stream_recipient_ids = Recipient.objects.filter(
 | 
						|
            type=Recipient.STREAM, type_id__in=public_streams).values_list("id", flat=True)
 | 
						|
 | 
						|
        consented_recipient_ids = Subscription.objects.filter(user_profile__id__in=consented_user_ids). \
 | 
						|
            values_list("recipient_id", flat=True)
 | 
						|
 | 
						|
        recipient_ids = set(public_stream_recipient_ids) | set(consented_recipient_ids)
 | 
						|
        recipient_ids_for_us = get_ids(response['zerver_recipient']) & recipient_ids
 | 
						|
    else:
 | 
						|
        recipient_ids_for_us = get_ids(response['zerver_recipient'])
 | 
						|
        # For a full export, we have implicit consent for all users in the export.
 | 
						|
        consented_user_ids = user_ids_for_us
 | 
						|
 | 
						|
    if public_only:
 | 
						|
        messages_we_received = Message.objects.filter(
 | 
						|
            sender__in=ids_of_our_possible_senders,
 | 
						|
            recipient__in=recipient_ids_for_us,
 | 
						|
        ).order_by('id')
 | 
						|
 | 
						|
        # For the public stream export, we only need the messages those streams received.
 | 
						|
        message_queries = [
 | 
						|
            messages_we_received,
 | 
						|
        ]
 | 
						|
    else:
 | 
						|
        # We capture most messages here: Messages that were sent by
 | 
						|
        # anyone in the export and received by any of the users who we
 | 
						|
        # have consent to export.
 | 
						|
        messages_we_received = Message.objects.filter(
 | 
						|
            sender__in=ids_of_our_possible_senders,
 | 
						|
            recipient__in=recipient_ids_for_us,
 | 
						|
        ).order_by('id')
 | 
						|
 | 
						|
        # The above query is missing some messages that consenting
 | 
						|
        # users have access to, namely, PMs sent by one of the users
 | 
						|
        # in our export to another user (since the only subscriber to
 | 
						|
        # a Recipient object for Recipient.PERSONAL is the recipient,
 | 
						|
        # not the sender).  The `consented_user_ids` list has
 | 
						|
        # precisely those users whose Recipient.PERSONAL recipient ID
 | 
						|
        # was already present in recipient_ids_for_us above.
 | 
						|
        ids_of_non_exported_possible_recipients = ids_of_our_possible_senders - consented_user_ids
 | 
						|
 | 
						|
        recipients_for_them = Recipient.objects.filter(
 | 
						|
            type=Recipient.PERSONAL,
 | 
						|
            type_id__in=ids_of_non_exported_possible_recipients).values("id")
 | 
						|
        recipient_ids_for_them = get_ids(recipients_for_them)
 | 
						|
 | 
						|
        messages_we_sent_to_them = Message.objects.filter(
 | 
						|
            sender__in=consented_user_ids,
 | 
						|
            recipient__in=recipient_ids_for_them,
 | 
						|
        ).order_by('id')
 | 
						|
 | 
						|
        message_queries = [
 | 
						|
            messages_we_received,
 | 
						|
            messages_we_sent_to_them,
 | 
						|
        ]
 | 
						|
 | 
						|
    all_message_ids = set()  # type: Set[int]
 | 
						|
    dump_file_id = 1
 | 
						|
 | 
						|
    for message_query in message_queries:
 | 
						|
        dump_file_id = write_message_partial_for_query(
 | 
						|
            realm=realm,
 | 
						|
            message_query=message_query,
 | 
						|
            dump_file_id=dump_file_id,
 | 
						|
            all_message_ids=all_message_ids,
 | 
						|
            output_dir=output_dir,
 | 
						|
            user_profile_ids=user_ids_for_us,
 | 
						|
            chunk_size=chunk_size,
 | 
						|
        )
 | 
						|
 | 
						|
    return all_message_ids
 | 
						|
 | 
						|
def write_message_partial_for_query(realm: Realm, message_query: Any, dump_file_id: int,
 | 
						|
                                    all_message_ids: Set[int], output_dir: Path,
 | 
						|
                                    user_profile_ids: Set[int],
 | 
						|
                                    chunk_size: int=MESSAGE_BATCH_CHUNK_SIZE) -> int:
 | 
						|
    min_id = -1
 | 
						|
 | 
						|
    while True:
 | 
						|
        actual_query = message_query.filter(id__gt=min_id)[0:chunk_size]
 | 
						|
        message_chunk = make_raw(actual_query)
 | 
						|
        message_ids = set(m['id'] for m in message_chunk)
 | 
						|
        assert len(message_ids.intersection(all_message_ids)) == 0
 | 
						|
 | 
						|
        all_message_ids.update(message_ids)
 | 
						|
 | 
						|
        if len(message_chunk) == 0:
 | 
						|
            break
 | 
						|
 | 
						|
        # Figure out the name of our shard file.
 | 
						|
        message_filename = os.path.join(output_dir, "messages-%06d.json" % (dump_file_id,))
 | 
						|
        message_filename += '.partial'
 | 
						|
        logging.info("Fetched Messages for %s" % (message_filename,))
 | 
						|
 | 
						|
        # Clean up our messages.
 | 
						|
        table_data = {}  # type: TableData
 | 
						|
        table_data['zerver_message'] = message_chunk
 | 
						|
        floatify_datetime_fields(table_data, 'zerver_message')
 | 
						|
 | 
						|
        # Build up our output for the .partial file, which needs
 | 
						|
        # a list of user_profile_ids to search for (as well as
 | 
						|
        # the realm id).
 | 
						|
        output = {}  # type: MessageOutput
 | 
						|
        output['zerver_message'] = table_data['zerver_message']
 | 
						|
        output['zerver_userprofile_ids'] = list(user_profile_ids)
 | 
						|
        output['realm_id'] = realm.id
 | 
						|
 | 
						|
        # And write the data.
 | 
						|
        write_message_export(message_filename, output)
 | 
						|
        min_id = max(message_ids)
 | 
						|
        dump_file_id += 1
 | 
						|
 | 
						|
    return dump_file_id
 | 
						|
 | 
						|
def export_uploads_and_avatars(realm: Realm, output_dir: Path) -> None:
 | 
						|
    uploads_output_dir = os.path.join(output_dir, 'uploads')
 | 
						|
    avatars_output_dir = os.path.join(output_dir, 'avatars')
 | 
						|
    emoji_output_dir = os.path.join(output_dir, 'emoji')
 | 
						|
 | 
						|
    for output_dir in (uploads_output_dir, avatars_output_dir, emoji_output_dir):
 | 
						|
        if not os.path.exists(output_dir):
 | 
						|
            os.makedirs(output_dir)
 | 
						|
 | 
						|
    if settings.LOCAL_UPLOADS_DIR:
 | 
						|
        # Small installations and developers will usually just store files locally.
 | 
						|
        export_uploads_from_local(realm,
 | 
						|
                                  local_dir=os.path.join(settings.LOCAL_UPLOADS_DIR, "files"),
 | 
						|
                                  output_dir=uploads_output_dir)
 | 
						|
        export_avatars_from_local(realm,
 | 
						|
                                  local_dir=os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars"),
 | 
						|
                                  output_dir=avatars_output_dir)
 | 
						|
        export_emoji_from_local(realm,
 | 
						|
                                local_dir=os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars"),
 | 
						|
                                output_dir=emoji_output_dir)
 | 
						|
    else:
 | 
						|
        # Some bigger installations will have their data stored on S3.
 | 
						|
        export_files_from_s3(realm,
 | 
						|
                             settings.S3_AVATAR_BUCKET,
 | 
						|
                             output_dir=avatars_output_dir,
 | 
						|
                             processing_avatars=True)
 | 
						|
        export_files_from_s3(realm,
 | 
						|
                             settings.S3_AUTH_UPLOADS_BUCKET,
 | 
						|
                             output_dir=uploads_output_dir)
 | 
						|
        export_files_from_s3(realm,
 | 
						|
                             settings.S3_AVATAR_BUCKET,
 | 
						|
                             output_dir=emoji_output_dir,
 | 
						|
                             processing_emoji=True)
 | 
						|
 | 
						|
def _check_key_metadata(email_gateway_bot: Optional[UserProfile],
 | 
						|
                        key: Key, processing_avatars: bool,
 | 
						|
                        realm: Realm, user_ids: Set[int]) -> None:
 | 
						|
    # Helper function for export_files_from_s3
 | 
						|
    if 'realm_id' in key.metadata and key.metadata['realm_id'] != str(realm.id):
 | 
						|
        if email_gateway_bot is None or key.metadata['user_profile_id'] != str(email_gateway_bot.id):
 | 
						|
            raise AssertionError("Key metadata problem: %s %s / %s" % (key.name, key.metadata, realm.id))
 | 
						|
        # Email gateway bot sends messages, potentially including attachments, cross-realm.
 | 
						|
        print("File uploaded by email gateway bot: %s / %s" % (key.name, key.metadata))
 | 
						|
    elif processing_avatars:
 | 
						|
        if 'user_profile_id' not in key.metadata:
 | 
						|
            raise AssertionError("Missing user_profile_id in key metadata: %s" % (key.metadata,))
 | 
						|
        if int(key.metadata['user_profile_id']) not in user_ids:
 | 
						|
            raise AssertionError("Wrong user_profile_id in key metadata: %s" % (key.metadata,))
 | 
						|
    elif 'realm_id' not in key.metadata:
 | 
						|
        raise AssertionError("Missing realm_id in key metadata: %s" % (key.metadata,))
 | 
						|
 | 
						|
def _get_exported_s3_record(
 | 
						|
        bucket_name: str,
 | 
						|
        key: Key,
 | 
						|
        processing_avatars: bool,
 | 
						|
        processing_emoji: bool) -> Dict[str, Union[str, int]]:
 | 
						|
    # Helper function for export_files_from_s3
 | 
						|
    record = dict(s3_path=key.name, bucket=bucket_name,
 | 
						|
                  size=key.size, last_modified=key.last_modified,
 | 
						|
                  content_type=key.content_type, md5=key.md5)
 | 
						|
    record.update(key.metadata)
 | 
						|
 | 
						|
    if processing_emoji:
 | 
						|
        record['file_name'] = os.path.basename(key.name)
 | 
						|
 | 
						|
    # A few early avatars don't have 'realm_id' on the object; fix their metadata
 | 
						|
    user_profile = get_user_profile_by_id(record['user_profile_id'])
 | 
						|
    if 'realm_id' not in record:
 | 
						|
        record['realm_id'] = user_profile.realm_id
 | 
						|
    record['user_profile_email'] = user_profile.email
 | 
						|
 | 
						|
    # Fix the record ids
 | 
						|
    record['user_profile_id'] = int(record['user_profile_id'])
 | 
						|
    record['realm_id'] = int(record['realm_id'])
 | 
						|
 | 
						|
    return record
 | 
						|
 | 
						|
def _save_s3_object_to_file(
 | 
						|
        key: Key,
 | 
						|
        output_dir: str,
 | 
						|
        processing_avatars: bool,
 | 
						|
        processing_emoji: bool) -> None:
 | 
						|
 | 
						|
    # Helper function for export_files_from_s3
 | 
						|
    if processing_avatars or processing_emoji:
 | 
						|
        filename = os.path.join(output_dir, key.name)
 | 
						|
    else:
 | 
						|
        fields = key.name.split('/')
 | 
						|
        if len(fields) != 3:
 | 
						|
            raise AssertionError("Suspicious key with invalid format %s" % (key.name,))
 | 
						|
        filename = os.path.join(output_dir, key.name)
 | 
						|
 | 
						|
    dirname = os.path.dirname(filename)
 | 
						|
    if not os.path.exists(dirname):
 | 
						|
        os.makedirs(dirname)
 | 
						|
    key.get_contents_to_filename(filename)
 | 
						|
 | 
						|
def export_files_from_s3(realm: Realm, bucket_name: str, output_dir: Path,
 | 
						|
                         processing_avatars: bool=False,
 | 
						|
                         processing_emoji: bool=False) -> None:
 | 
						|
    conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
 | 
						|
    bucket = conn.get_bucket(bucket_name, validate=True)
 | 
						|
    records = []
 | 
						|
 | 
						|
    logging.info("Downloading uploaded files from %s" % (bucket_name,))
 | 
						|
 | 
						|
    avatar_hash_values = set()
 | 
						|
    user_ids = set()
 | 
						|
    if processing_avatars:
 | 
						|
        bucket_list = bucket.list()
 | 
						|
        for user_profile in UserProfile.objects.filter(realm=realm):
 | 
						|
            avatar_path = user_avatar_path_from_ids(user_profile.id, realm.id)
 | 
						|
            avatar_hash_values.add(avatar_path)
 | 
						|
            avatar_hash_values.add(avatar_path + ".original")
 | 
						|
            user_ids.add(user_profile.id)
 | 
						|
    if processing_emoji:
 | 
						|
        bucket_list = bucket.list(prefix="%s/emoji/images/" % (realm.id,))
 | 
						|
    else:
 | 
						|
        bucket_list = bucket.list(prefix="%s/" % (realm.id,))
 | 
						|
 | 
						|
    if settings.EMAIL_GATEWAY_BOT is not None:
 | 
						|
        email_gateway_bot = get_system_bot(settings.EMAIL_GATEWAY_BOT)  # type: Optional[UserProfile]
 | 
						|
    else:
 | 
						|
        email_gateway_bot = None
 | 
						|
 | 
						|
    count = 0
 | 
						|
    for bkey in bucket_list:
 | 
						|
        if processing_avatars and bkey.name not in avatar_hash_values:
 | 
						|
            continue
 | 
						|
        key = bucket.get_key(bkey.name)
 | 
						|
 | 
						|
        # This can happen if an email address has moved realms
 | 
						|
        _check_key_metadata(email_gateway_bot, key, processing_avatars, realm, user_ids)
 | 
						|
        record = _get_exported_s3_record(bucket_name, key, processing_avatars, processing_emoji)
 | 
						|
 | 
						|
        record['path'] = key.name
 | 
						|
        _save_s3_object_to_file(key, output_dir, processing_avatars, processing_emoji)
 | 
						|
 | 
						|
        records.append(record)
 | 
						|
        count += 1
 | 
						|
 | 
						|
        if (count % 100 == 0):
 | 
						|
            logging.info("Finished %s" % (count,))
 | 
						|
 | 
						|
    with open(os.path.join(output_dir, "records.json"), "w") as records_file:
 | 
						|
        ujson.dump(records, records_file, indent=4)
 | 
						|
 | 
						|
def export_uploads_from_local(realm: Realm, local_dir: Path, output_dir: Path) -> None:
 | 
						|
 | 
						|
    count = 0
 | 
						|
    records = []
 | 
						|
    for attachment in Attachment.objects.filter(realm_id=realm.id):
 | 
						|
        local_path = os.path.join(local_dir, attachment.path_id)
 | 
						|
        output_path = os.path.join(output_dir, attachment.path_id)
 | 
						|
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
 | 
						|
        shutil.copy2(local_path, output_path)
 | 
						|
        stat = os.stat(local_path)
 | 
						|
        record = dict(realm_id=attachment.realm_id,
 | 
						|
                      user_profile_id=attachment.owner.id,
 | 
						|
                      user_profile_email=attachment.owner.email,
 | 
						|
                      s3_path=attachment.path_id,
 | 
						|
                      path=attachment.path_id,
 | 
						|
                      size=stat.st_size,
 | 
						|
                      last_modified=stat.st_mtime,
 | 
						|
                      content_type=None)
 | 
						|
        records.append(record)
 | 
						|
 | 
						|
        count += 1
 | 
						|
 | 
						|
        if (count % 100 == 0):
 | 
						|
            logging.info("Finished %s" % (count,))
 | 
						|
    with open(os.path.join(output_dir, "records.json"), "w") as records_file:
 | 
						|
        ujson.dump(records, records_file, indent=4)
 | 
						|
 | 
						|
def export_avatars_from_local(realm: Realm, local_dir: Path, output_dir: Path) -> None:
 | 
						|
 | 
						|
    count = 0
 | 
						|
    records = []
 | 
						|
 | 
						|
    users = list(UserProfile.objects.filter(realm=realm))
 | 
						|
    users += [
 | 
						|
        get_system_bot(settings.NOTIFICATION_BOT),
 | 
						|
        get_system_bot(settings.EMAIL_GATEWAY_BOT),
 | 
						|
        get_system_bot(settings.WELCOME_BOT),
 | 
						|
    ]
 | 
						|
    for user in users:
 | 
						|
        if user.avatar_source == UserProfile.AVATAR_FROM_GRAVATAR:
 | 
						|
            continue
 | 
						|
 | 
						|
        avatar_path = user_avatar_path_from_ids(user.id, realm.id)
 | 
						|
        wildcard = os.path.join(local_dir, avatar_path + '.*')
 | 
						|
 | 
						|
        for local_path in glob.glob(wildcard):
 | 
						|
            logging.info('Copying avatar file for user %s from %s' % (
 | 
						|
                user.email, local_path))
 | 
						|
            fn = os.path.relpath(local_path, local_dir)
 | 
						|
            output_path = os.path.join(output_dir, fn)
 | 
						|
            os.makedirs(str(os.path.dirname(output_path)), exist_ok=True)
 | 
						|
            shutil.copy2(str(local_path), str(output_path))
 | 
						|
            stat = os.stat(local_path)
 | 
						|
            record = dict(realm_id=realm.id,
 | 
						|
                          user_profile_id=user.id,
 | 
						|
                          user_profile_email=user.email,
 | 
						|
                          s3_path=fn,
 | 
						|
                          path=fn,
 | 
						|
                          size=stat.st_size,
 | 
						|
                          last_modified=stat.st_mtime,
 | 
						|
                          content_type=None)
 | 
						|
            records.append(record)
 | 
						|
 | 
						|
            count += 1
 | 
						|
 | 
						|
            if (count % 100 == 0):
 | 
						|
                logging.info("Finished %s" % (count,))
 | 
						|
 | 
						|
    with open(os.path.join(output_dir, "records.json"), "w") as records_file:
 | 
						|
        ujson.dump(records, records_file, indent=4)
 | 
						|
 | 
						|
def export_emoji_from_local(realm: Realm, local_dir: Path, output_dir: Path) -> None:
 | 
						|
 | 
						|
    count = 0
 | 
						|
    records = []
 | 
						|
    for realm_emoji in RealmEmoji.objects.filter(realm_id=realm.id):
 | 
						|
        emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
 | 
						|
            realm_id=realm.id,
 | 
						|
            emoji_file_name=realm_emoji.file_name
 | 
						|
        )
 | 
						|
        local_path = os.path.join(local_dir, emoji_path)
 | 
						|
        output_path = os.path.join(output_dir, emoji_path)
 | 
						|
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
 | 
						|
        shutil.copy2(local_path, output_path)
 | 
						|
        # Realm Emoji author is optional.
 | 
						|
        author = realm_emoji.author
 | 
						|
        author_id = None
 | 
						|
        if author:
 | 
						|
            author_id = realm_emoji.author.id
 | 
						|
        record = dict(realm_id=realm.id,
 | 
						|
                      author=author_id,
 | 
						|
                      path=emoji_path,
 | 
						|
                      s3_path=emoji_path,
 | 
						|
                      file_name=realm_emoji.file_name,
 | 
						|
                      name=realm_emoji.name,
 | 
						|
                      deactivated=realm_emoji.deactivated)
 | 
						|
        records.append(record)
 | 
						|
 | 
						|
        count += 1
 | 
						|
        if (count % 100 == 0):
 | 
						|
            logging.info("Finished %s" % (count,))
 | 
						|
    with open(os.path.join(output_dir, "records.json"), "w") as records_file:
 | 
						|
        ujson.dump(records, records_file, indent=4)
 | 
						|
 | 
						|
def do_write_stats_file_for_realm_export(output_dir: Path) -> None:
 | 
						|
    stats_file = os.path.join(output_dir, 'stats.txt')
 | 
						|
    realm_file = os.path.join(output_dir, 'realm.json')
 | 
						|
    attachment_file = os.path.join(output_dir, 'attachment.json')
 | 
						|
    analytics_file = os.path.join(output_dir, 'analytics.json')
 | 
						|
    message_files = glob.glob(os.path.join(output_dir, 'messages-*.json'))
 | 
						|
    fns = sorted([analytics_file] + [attachment_file] + message_files + [realm_file])
 | 
						|
 | 
						|
    logging.info('Writing stats file: %s\n' % (stats_file,))
 | 
						|
    with open(stats_file, 'w') as f:
 | 
						|
        for fn in fns:
 | 
						|
            f.write(os.path.basename(fn) + '\n')
 | 
						|
            with open(fn, 'r') as filename:
 | 
						|
                payload = filename.read()
 | 
						|
            data = ujson.loads(payload)
 | 
						|
            for k in sorted(data):
 | 
						|
                f.write('%5d %s\n' % (len(data[k]), k))
 | 
						|
            f.write('\n')
 | 
						|
 | 
						|
        avatar_file = os.path.join(output_dir, 'avatars/records.json')
 | 
						|
        uploads_file = os.path.join(output_dir, 'uploads/records.json')
 | 
						|
 | 
						|
        for fn in [avatar_file, uploads_file]:
 | 
						|
            f.write(fn+'\n')
 | 
						|
            with open(fn, 'r') as filename:
 | 
						|
                payload = filename.read()
 | 
						|
            data = ujson.loads(payload)
 | 
						|
            f.write('%5d records\n' % (len(data),))
 | 
						|
            f.write('\n')
 | 
						|
 | 
						|
def do_export_realm(realm: Realm, output_dir: Path, threads: int,
 | 
						|
                    exportable_user_ids: Optional[Set[int]]=None,
 | 
						|
                    public_only: bool=False,
 | 
						|
                    consent_message_id: Optional[int]=None) -> str:
 | 
						|
    response = {}  # type: TableData
 | 
						|
 | 
						|
    # We need at least one thread running to export
 | 
						|
    # UserMessage rows.  The management command should
 | 
						|
    # enforce this for us.
 | 
						|
    if not settings.TEST_SUITE:
 | 
						|
        assert threads >= 1
 | 
						|
 | 
						|
    realm_config = get_realm_config()
 | 
						|
 | 
						|
    create_soft_link(source=output_dir, in_progress=True)
 | 
						|
 | 
						|
    logging.info("Exporting data from get_realm_config()...")
 | 
						|
    export_from_config(
 | 
						|
        response=response,
 | 
						|
        config=realm_config,
 | 
						|
        seed_object=realm,
 | 
						|
        context=dict(realm=realm, exportable_user_ids=exportable_user_ids)
 | 
						|
    )
 | 
						|
    logging.info('...DONE with get_realm_config() data')
 | 
						|
 | 
						|
    sanity_check_output(response)
 | 
						|
 | 
						|
    logging.info("Exporting uploaded files and avatars")
 | 
						|
    export_uploads_and_avatars(realm, output_dir)
 | 
						|
 | 
						|
    # We (sort of) export zerver_message rows here.  We write
 | 
						|
    # them to .partial files that are subsequently fleshed out
 | 
						|
    # by parallel processes to add in zerver_usermessage data.
 | 
						|
    # This is for performance reasons, of course.  Some installations
 | 
						|
    # have millions of messages.
 | 
						|
    logging.info("Exporting .partial files messages")
 | 
						|
    message_ids = export_partial_message_files(realm, response, output_dir=output_dir,
 | 
						|
                                               public_only=public_only,
 | 
						|
                                               consent_message_id=consent_message_id)
 | 
						|
    logging.info('%d messages were exported' % (len(message_ids),))
 | 
						|
 | 
						|
    # zerver_reaction
 | 
						|
    zerver_reaction = {}  # type: TableData
 | 
						|
    fetch_reaction_data(response=zerver_reaction, message_ids=message_ids)
 | 
						|
    response.update(zerver_reaction)
 | 
						|
 | 
						|
    # Write realm data
 | 
						|
    export_file = os.path.join(output_dir, "realm.json")
 | 
						|
    write_data_to_file(output_file=export_file, data=response)
 | 
						|
    logging.info('Writing realm data to %s' % (export_file,))
 | 
						|
 | 
						|
    # Write analytics data
 | 
						|
    export_analytics_tables(realm=realm, output_dir=output_dir)
 | 
						|
 | 
						|
    # zerver_attachment
 | 
						|
    export_attachment_table(realm=realm, output_dir=output_dir, message_ids=message_ids)
 | 
						|
 | 
						|
    # Start parallel jobs to export the UserMessage objects.
 | 
						|
    launch_user_message_subprocesses(threads=threads, output_dir=output_dir,
 | 
						|
                                     consent_message_id=consent_message_id)
 | 
						|
 | 
						|
    logging.info("Finished exporting %s" % (realm.string_id,))
 | 
						|
    create_soft_link(source=output_dir, in_progress=False)
 | 
						|
 | 
						|
    do_write_stats_file_for_realm_export(output_dir)
 | 
						|
 | 
						|
    # We need to change back to the current working directory after writing
 | 
						|
    # the tarball to the output directory, otherwise the state is compromised
 | 
						|
    # for our unit tests.
 | 
						|
    reset_dir = os.getcwd()
 | 
						|
    tarball_path = output_dir.rstrip('/') + '.tar.gz'
 | 
						|
    os.chdir(os.path.dirname(output_dir))
 | 
						|
    subprocess.check_call(["tar", "-czf", tarball_path, os.path.basename(output_dir)])
 | 
						|
    os.chdir(reset_dir)
 | 
						|
    return tarball_path
 | 
						|
 | 
						|
def export_attachment_table(realm: Realm, output_dir: Path, message_ids: Set[int]) -> None:
 | 
						|
    response = {}  # type: TableData
 | 
						|
    fetch_attachment_data(response=response, realm_id=realm.id, message_ids=message_ids)
 | 
						|
    output_file = os.path.join(output_dir, "attachment.json")
 | 
						|
    logging.info('Writing attachment table data to %s' % (output_file,))
 | 
						|
    write_data_to_file(output_file=output_file, data=response)
 | 
						|
 | 
						|
def create_soft_link(source: Path, in_progress: bool=True) -> None:
 | 
						|
    is_done = not in_progress
 | 
						|
    if settings.DEVELOPMENT:
 | 
						|
        in_progress_link = os.path.join(settings.DEPLOY_ROOT, 'var', 'export-in-progress')
 | 
						|
        done_link = os.path.join(settings.DEPLOY_ROOT, 'var', 'export-most-recent')
 | 
						|
    else:
 | 
						|
        in_progress_link = '/home/zulip/export-in-progress'
 | 
						|
        done_link = '/home/zulip/export-most-recent'
 | 
						|
 | 
						|
    if in_progress:
 | 
						|
        new_target = in_progress_link
 | 
						|
    else:
 | 
						|
        try:
 | 
						|
            os.remove(in_progress_link)
 | 
						|
        except FileNotFoundError:
 | 
						|
            pass
 | 
						|
        new_target = done_link
 | 
						|
 | 
						|
    overwrite_symlink(source, new_target)
 | 
						|
    if is_done:
 | 
						|
        logging.info('See %s for output files' % (new_target,))
 | 
						|
 | 
						|
def launch_user_message_subprocesses(threads: int, output_dir: Path,
 | 
						|
                                     consent_message_id: Optional[int]=None) -> None:
 | 
						|
    logging.info('Launching %d PARALLEL subprocesses to export UserMessage rows' % (threads,))
 | 
						|
 | 
						|
    def run_job(shard: str) -> int:
 | 
						|
        arguments = [
 | 
						|
            os.path.join(settings.DEPLOY_ROOT, "manage.py"),
 | 
						|
            'export_usermessage_batch',
 | 
						|
            '--path', str(output_dir),
 | 
						|
            '--thread', shard
 | 
						|
        ]
 | 
						|
        if consent_message_id is not None:
 | 
						|
            arguments.extend(['--consent-message-id', str(consent_message_id)])
 | 
						|
 | 
						|
        subprocess.call(arguments)
 | 
						|
        return 0
 | 
						|
 | 
						|
    for (status, job) in run_parallel(run_job,
 | 
						|
                                      [str(x) for x in range(0, threads)],
 | 
						|
                                      threads=threads):
 | 
						|
        print("Shard %s finished, status %s" % (job, status))
 | 
						|
 | 
						|
def do_export_user(user_profile: UserProfile, output_dir: Path) -> None:
 | 
						|
    response = {}  # type: TableData
 | 
						|
 | 
						|
    export_single_user(user_profile, response)
 | 
						|
    export_file = os.path.join(output_dir, "user.json")
 | 
						|
    write_data_to_file(output_file=export_file, data=response)
 | 
						|
    logging.info("Exporting messages")
 | 
						|
    export_messages_single_user(user_profile, output_dir)
 | 
						|
 | 
						|
def export_single_user(user_profile: UserProfile, response: TableData) -> None:
 | 
						|
 | 
						|
    config = get_single_user_config()
 | 
						|
    export_from_config(
 | 
						|
        response=response,
 | 
						|
        config=config,
 | 
						|
        seed_object=user_profile,
 | 
						|
    )
 | 
						|
 | 
						|
def get_single_user_config() -> Config:
 | 
						|
    # This function defines the limited configuration for what data to
 | 
						|
    # export when exporting all data that a single Zulip user has
 | 
						|
    # access to in an organization.
 | 
						|
 | 
						|
    # zerver_userprofile
 | 
						|
    user_profile_config = Config(
 | 
						|
        table='zerver_userprofile',
 | 
						|
        is_seeded=True,
 | 
						|
        exclude=['password', 'api_key'],
 | 
						|
    )
 | 
						|
 | 
						|
    # zerver_subscription
 | 
						|
    subscription_config = Config(
 | 
						|
        table='zerver_subscription',
 | 
						|
        model=Subscription,
 | 
						|
        normal_parent=user_profile_config,
 | 
						|
        parent_key='user_profile__in',
 | 
						|
    )
 | 
						|
 | 
						|
    # zerver_recipient
 | 
						|
    recipient_config = Config(
 | 
						|
        table='zerver_recipient',
 | 
						|
        model=Recipient,
 | 
						|
        virtual_parent=subscription_config,
 | 
						|
        id_source=('zerver_subscription', 'recipient'),
 | 
						|
    )
 | 
						|
 | 
						|
    # zerver_stream
 | 
						|
    #
 | 
						|
    # TODO: We currently export the existence of private streams, but
 | 
						|
    # not their message history, in the "export with partial member
 | 
						|
    # consent" code path.  This consistent with our documented policy,
 | 
						|
    # since that data is available to the organization administrator
 | 
						|
    # who initiated the export, but unnecessary and potentially
 | 
						|
    # confusing; it'd be better to just skip those streams from the
 | 
						|
    # export (which would require more complex export logic for the
 | 
						|
    # subscription/recipient/stream tables to exclude private streams
 | 
						|
    # with no consenting subscribers).
 | 
						|
    Config(
 | 
						|
        table='zerver_stream',
 | 
						|
        model=Stream,
 | 
						|
        virtual_parent=recipient_config,
 | 
						|
        id_source=('zerver_recipient', 'type_id'),
 | 
						|
        source_filter=lambda r: r['type'] == Recipient.STREAM,
 | 
						|
        exclude=['email_token'],
 | 
						|
    )
 | 
						|
 | 
						|
    return user_profile_config
 | 
						|
 | 
						|
def export_messages_single_user(user_profile: UserProfile, output_dir: Path,
 | 
						|
                                chunk_size: int=MESSAGE_BATCH_CHUNK_SIZE) -> None:
 | 
						|
    user_message_query = UserMessage.objects.filter(user_profile=user_profile).order_by("id")
 | 
						|
    min_id = -1
 | 
						|
    dump_file_id = 1
 | 
						|
    while True:
 | 
						|
        actual_query = user_message_query.select_related(
 | 
						|
            "message", "message__sending_client").filter(id__gt=min_id)[0:chunk_size]
 | 
						|
        user_message_chunk = [um for um in actual_query]
 | 
						|
        user_message_ids = set(um.id for um in user_message_chunk)
 | 
						|
 | 
						|
        if len(user_message_chunk) == 0:
 | 
						|
            break
 | 
						|
 | 
						|
        message_chunk = []
 | 
						|
        for user_message in user_message_chunk:
 | 
						|
            item = model_to_dict(user_message.message)
 | 
						|
            item['flags'] = user_message.flags_list()
 | 
						|
            item['flags_mask'] = user_message.flags.mask
 | 
						|
            # Add a few nice, human-readable details
 | 
						|
            item['sending_client_name'] = user_message.message.sending_client.name
 | 
						|
            item['display_recipient'] = get_display_recipient(user_message.message.recipient)
 | 
						|
            message_chunk.append(item)
 | 
						|
 | 
						|
        message_filename = os.path.join(output_dir, "messages-%06d.json" % (dump_file_id,))
 | 
						|
        logging.info("Fetched Messages for %s" % (message_filename,))
 | 
						|
 | 
						|
        output = {'zerver_message': message_chunk}
 | 
						|
        floatify_datetime_fields(output, 'zerver_message')
 | 
						|
        message_output = dict(output)  # type: MessageOutput
 | 
						|
 | 
						|
        write_message_export(message_filename, message_output)
 | 
						|
        min_id = max(user_message_ids)
 | 
						|
        dump_file_id += 1
 | 
						|
 | 
						|
def export_analytics_tables(realm: Realm, output_dir: Path) -> None:
 | 
						|
    response = {}  # type: TableData
 | 
						|
 | 
						|
    export_file = os.path.join(output_dir, "analytics.json")
 | 
						|
    logging.info("Writing analytics table data to %s", (export_file))
 | 
						|
    config = get_analytics_config()
 | 
						|
    export_from_config(
 | 
						|
        response=response,
 | 
						|
        config=config,
 | 
						|
        seed_object=realm,
 | 
						|
    )
 | 
						|
    write_data_to_file(output_file=export_file, data=response)
 | 
						|
 | 
						|
def get_analytics_config() -> Config:
 | 
						|
    # The Config function defines what data to export for the
 | 
						|
    # analytics.json file in a full-realm export.
 | 
						|
 | 
						|
    analytics_config = Config(
 | 
						|
        table='zerver_analytics',
 | 
						|
        is_seeded=True,
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='analytics_realmcount',
 | 
						|
        model=RealmCount,
 | 
						|
        normal_parent=analytics_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='analytics_usercount',
 | 
						|
        model=UserCount,
 | 
						|
        normal_parent=analytics_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    Config(
 | 
						|
        table='analytics_streamcount',
 | 
						|
        model=StreamCount,
 | 
						|
        normal_parent=analytics_config,
 | 
						|
        parent_key='realm_id__in',
 | 
						|
    )
 | 
						|
 | 
						|
    return analytics_config
 | 
						|
 | 
						|
def get_consented_user_ids(consent_message_id: int) -> Set[int]:
 | 
						|
    return set(Reaction.objects.filter(message__id=consent_message_id,
 | 
						|
                                       reaction_type="unicode_emoji",
 | 
						|
                                       # outbox = 1f4e4
 | 
						|
                                       emoji_code="1f4e4").
 | 
						|
               values_list("user_profile", flat=True))
 | 
						|
 | 
						|
def export_realm_wrapper(realm: Realm, output_dir: str,
 | 
						|
                         threads: int, upload: bool,
 | 
						|
                         public_only: bool,
 | 
						|
                         delete_after_upload: bool,
 | 
						|
                         consent_message_id: Optional[int]=None) -> Optional[str]:
 | 
						|
    tarball_path = do_export_realm(realm=realm, output_dir=output_dir,
 | 
						|
                                   threads=threads, public_only=public_only,
 | 
						|
                                   consent_message_id=consent_message_id)
 | 
						|
    print("Finished exporting to %s" % (output_dir,))
 | 
						|
    print("Tarball written to %s" % (tarball_path,))
 | 
						|
 | 
						|
    if not upload:
 | 
						|
        return None
 | 
						|
 | 
						|
    # We upload to the `avatars` bucket because that's world-readable
 | 
						|
    # without additional configuration.  We'll likely want to change
 | 
						|
    # that in the future.
 | 
						|
    print("Uploading export tarball...")
 | 
						|
    public_url = zerver.lib.upload.upload_backend.upload_export_tarball(realm, tarball_path)
 | 
						|
    print()
 | 
						|
    print("Uploaded to %s" % (public_url,))
 | 
						|
 | 
						|
    if delete_after_upload:
 | 
						|
        os.remove(tarball_path)
 | 
						|
        print("Successfully deleted the tarball at %s" % (tarball_path,))
 | 
						|
    return public_url
 |