mirror of
https://github.com/zulip/zulip.git
synced 2025-11-15 11:22:04 +00:00
email_mirror: Use .walk() to search all MIME parts for attachments.
Fixes #13416 We used to search only one level in depth through the MIME structure, and thus would miss attachments that were nested deeper (which can happen with some email clients). We can take advantage of message.walk() to iterate through each MIME part.
This commit is contained in:
committed by
Tim Abbott
parent
5d2befdc54
commit
f538f34d95
@@ -228,14 +228,9 @@ def filter_footer(text: str) -> str:
|
||||
|
||||
def extract_and_upload_attachments(message: message.Message, realm: Realm) -> str:
|
||||
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT)
|
||||
|
||||
attachment_links = []
|
||||
|
||||
payload = message.get_payload()
|
||||
if not isinstance(payload, list):
|
||||
# This is not a multipart message, so it can't contain attachments.
|
||||
return ""
|
||||
|
||||
for part in payload:
|
||||
for part in message.walk():
|
||||
content_type = part.get_content_type()
|
||||
filename = part.get_filename()
|
||||
if filename:
|
||||
@@ -251,7 +246,7 @@ def extract_and_upload_attachments(message: message.Message, realm: Realm) -> st
|
||||
logger.warning("Payload is not bytes (invalid attachment %s in message from %s)." %
|
||||
(filename, message.get("From")))
|
||||
|
||||
return "\n".join(attachment_links)
|
||||
return '\n'.join(attachment_links)
|
||||
|
||||
def decode_stream_email_address(email: str) -> Tuple[Stream, Dict[str, bool]]:
|
||||
token, options = decode_email_address(email)
|
||||
|
||||
@@ -424,6 +424,44 @@ class TestEmailMirrorMessagesWithAttachments(ZulipTestCase):
|
||||
message = most_recent_message(user_profile)
|
||||
self.assertEqual(message.content, "Test body\n[image.png](https://test_url)")
|
||||
|
||||
def test_message_with_valid_nested_attachment(self) -> None:
|
||||
user_profile = self.example_user('hamlet')
|
||||
self.login(user_profile.email)
|
||||
self.subscribe(user_profile, "Denmark")
|
||||
stream = get_stream("Denmark", user_profile.realm)
|
||||
stream_to_address = encode_email_address(stream)
|
||||
|
||||
incoming_valid_message = MIMEMultipart()
|
||||
text_msg = MIMEText("Test body")
|
||||
incoming_valid_message.attach(text_msg)
|
||||
|
||||
nested_multipart = MIMEMultipart()
|
||||
nested_text_message = MIMEText("Nested text that should get skipped.")
|
||||
nested_multipart.attach(nested_text_message)
|
||||
with open(os.path.join(settings.DEPLOY_ROOT, "static/images/default-avatar.png"), 'rb') as f:
|
||||
image_bytes = f.read()
|
||||
|
||||
attachment_msg = MIMEImage(image_bytes)
|
||||
attachment_msg.add_header('Content-Disposition', 'attachment', filename="image.png")
|
||||
nested_multipart.attach(attachment_msg)
|
||||
incoming_valid_message.attach(nested_multipart)
|
||||
|
||||
incoming_valid_message['Subject'] = 'Subject'
|
||||
incoming_valid_message['From'] = self.example_email('hamlet')
|
||||
incoming_valid_message['To'] = stream_to_address
|
||||
incoming_valid_message['Reply-to'] = self.example_email('othello')
|
||||
|
||||
with mock.patch('zerver.lib.email_mirror.upload_message_file',
|
||||
return_value='https://test_url') as upload_message_file:
|
||||
process_message(incoming_valid_message)
|
||||
upload_message_file.assert_called_with('image.png', len(image_bytes),
|
||||
'image/png', image_bytes,
|
||||
get_system_bot(settings.EMAIL_GATEWAY_BOT),
|
||||
target_realm=user_profile.realm)
|
||||
|
||||
message = most_recent_message(user_profile)
|
||||
self.assertEqual(message.content, "Test body\n[image.png](https://test_url)")
|
||||
|
||||
def test_message_with_invalid_attachment(self) -> None:
|
||||
user_profile = self.example_user('hamlet')
|
||||
self.login(user_profile.email)
|
||||
|
||||
Reference in New Issue
Block a user