slack import: Write messages batch-wise.

Messages can be bulky, and storing them in a single
data structure can cause a memory error.

In this commit, the messages are written to a file
batch-wise, thus avoiding the memory error.
This commit is contained in:
Rhea Parekh
2018-06-17 22:46:42 +05:30
committed by Tim Abbott
parent 7f6c174099
commit 6b7b6b38ad
2 changed files with 60 additions and 24 deletions

View File

@@ -516,9 +516,9 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
added_users: AddedUsersT, added_recipient: AddedRecipientsT,
added_channels: AddedChannelsT, realm: ZerverFieldsT,
zerver_realmemoji: List[ZerverFieldsT], domain_name: str,
output_dir: str) -> Tuple[List[ZerverFieldsT],
List[ZerverFieldsT],
List[ZerverFieldsT]]:
output_dir: str, chunk_size: int=800) -> Tuple[List[ZerverFieldsT],
List[ZerverFieldsT],
List[ZerverFieldsT]]:
"""
Returns:
1. reactions, which is a list of the reactions
@@ -533,24 +533,46 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
logging.info('######### IMPORTING MESSAGES STARTED #########\n')
total_reactions = [] # type: List[ZerverFieldsT]
total_attachments = [] # type: List[ZerverFieldsT]
total_uploads = [] # type: List[ZerverFieldsT]
message_id = usermessage_id = reaction_id = attachment_id = 0
id_list = (message_id, usermessage_id, reaction_id, attachment_id)
zerver_message, zerver_usermessage, attachment, uploads, \
reactions, id_list = channel_message_to_zerver_message(
realm_id, users, added_users, added_recipient, all_messages,
zerver_realmemoji, realm['zerver_subscription'], added_channels,
id_list, domain_name)
# The messages are stored in batches
low_index = 0
upper_index = low_index + chunk_size
dump_file_id = 1
message_json = dict(
zerver_message=zerver_message,
zerver_usermessage=zerver_usermessage)
message_filename = os.path.join(output_dir, "messages-000001.json")
logging.info("Writing Messages to %s\n" % (message_filename,))
create_converted_data_files(message_json, output_dir, '/messages-000001.json')
while True:
message_data = all_messages[low_index:upper_index]
if len(message_data) == 0:
break
zerver_message, zerver_usermessage, attachment, uploads, \
reactions, id_list = channel_message_to_zerver_message(
realm_id, users, added_users, added_recipient, message_data,
zerver_realmemoji, realm['zerver_subscription'], added_channels,
id_list, domain_name)
message_json = dict(
zerver_message=zerver_message,
zerver_usermessage=zerver_usermessage)
message_file = "/messages-%06d.json" % (dump_file_id,)
logging.info("Writing Messages to %s\n" % (output_dir + message_file))
create_converted_data_files(message_json, output_dir, message_file)
total_reactions += reactions
total_attachments += attachment
total_uploads += uploads
low_index = upper_index
upper_index = chunk_size + low_index
dump_file_id += 1
logging.info('######### IMPORTING MESSAGES FINISHED #########\n')
return reactions, uploads, attachment
return total_reactions, total_uploads, total_attachments
def get_all_messages(slack_data_dir: str, added_channels: AddedChannelsT) -> List[ZerverFieldsT]:
all_messages = [] # type: List[ZerverFieldsT]

View File

@@ -465,8 +465,10 @@ class SlackImporter(ZulipTestCase):
@mock.patch("zerver.lib.slack_data_to_zulip_data.get_all_messages")
def test_convert_slack_workspace_messages(self, mock_get_all_messages: mock.Mock,
mock_message: mock.Mock) -> None:
os.makedirs('var/test-slack-import', exist_ok=True)
added_channels = {'random': ('c5', 1), 'general': ('c6', 2)} # type: Dict[str, Tuple[str, int]]
zerver_message = [{'id': 1}, {'id': 5}]
time = float(timezone_now().timestamp())
zerver_message = [{'id': 1, 'ts': time}, {'id': 5, 'ts': time}]
realm = {'zerver_subscription': []} # type: Dict[str, Any]
user_list = [] # type: List[Dict[str, Any]]
@@ -476,17 +478,29 @@ class SlackImporter(ZulipTestCase):
zerver_usermessage = [{'id': 3}, {'id': 5}, {'id': 6}, {'id': 9}]
mock_message.side_effect = [[zerver_message, zerver_usermessage, attachments, uploads,
reactions, id_list]]
mock_get_all_messages.side_effect = [zerver_message]
mock_message.side_effect = [[zerver_message[:1], zerver_usermessage[:2],
attachments, uploads, reactions[:1], id_list],
[zerver_message[1:2], zerver_usermessage[2:5],
attachments, uploads, reactions[1:1], id_list]]
test_reactions, uploads, zerver_attachment = convert_slack_workspace_messages(
'./random_path', user_list, 2, {}, {}, added_channels, realm, [], 'domain', 'var/test-slack-import')
messages_file = os.path.join('var', 'test-slack-import', 'messages-000001.json')
self.assertTrue(os.path.exists(messages_file))
'./random_path', user_list, 2, {}, {}, added_channels,
realm, [], 'domain', 'var/test-slack-import', chunk_size=1)
messages_file_1 = os.path.join('var', 'test-slack-import', 'messages-000001.json')
self.assertTrue(os.path.exists(messages_file_1))
messages_file_2 = os.path.join('var', 'test-slack-import', 'messages-000002.json')
self.assertTrue(os.path.exists(messages_file_2))
with open(messages_file) as f:
with open(messages_file_1) as f:
message_json = ujson.load(f)
self.assertEqual(message_json['zerver_message'], zerver_message)
self.assertEqual(message_json['zerver_usermessage'], zerver_usermessage)
self.assertEqual(message_json['zerver_message'], zerver_message[:1])
self.assertEqual(message_json['zerver_usermessage'], zerver_usermessage[:2])
with open(messages_file_2) as f:
message_json = ujson.load(f)
self.assertEqual(message_json['zerver_message'], zerver_message[1:2])
self.assertEqual(message_json['zerver_usermessage'], zerver_usermessage[2:5])
self.assertEqual(test_reactions, reactions)
@mock.patch("zerver.lib.slack_data_to_zulip_data.process_uploads", return_value = [])