message validation: Clean up extract_private_recipients.

This is mostly refactoring, but we also prevent a new
type of value error (list of non-int-or-string).  The
new test code helps enforce that.

Cleanup includes:

    - Use early-exit for email case.
    - Rename helpers to get_validate_*.
    - Avoid clumsy rebuilding of lists in helpers.
    - Avoid the confusing `recipient` name (which
      can be confused with the model by the same
      name).
    - Just delegate duplicate-id/email-removal to
      the helpers.

The cleaner structure allows us to elminate a couple
mypy workarounds.
This commit is contained in:
Steve Howell
2020-02-22 13:04:48 +00:00
committed by Tim Abbott
parent 303cd9bb9e
commit 995353fb28
2 changed files with 12 additions and 17 deletions

View File

@@ -2079,35 +2079,26 @@ def extract_private_recipients(s: str) -> Union[List[str], List[int]]:
return data
if isinstance(data[0], str):
recipients = extract_emails(data) # type: Union[List[str], List[int]]
return get_validated_emails(data)
if isinstance(data[0], int):
recipients = extract_user_ids(data)
if not isinstance(data[0], int):
raise ValueError("Invalid data type for recipients")
# Remove any duplicates.
return list(set(recipients)) # type: ignore # mypy gets confused about what's passed to set()
return get_validated_user_ids(data)
def extract_user_ids(user_ids: Iterable[int]) -> List[int]:
recipients = []
def get_validated_user_ids(user_ids: Iterable[int]) -> List[int]:
for user_id in user_ids:
if not isinstance(user_id, int):
raise TypeError("Recipient lists may contain emails or user IDs, but not both.")
recipients.append(user_id)
return list(set(user_ids))
return recipients
def extract_emails(emails: Iterable[str]) -> List[str]:
recipients = []
def get_validated_emails(emails: Iterable[str]) -> List[str]:
for email in emails:
if not isinstance(email, str):
raise TypeError("Recipient lists may contain emails or user IDs, but not both.")
email = email.strip()
if email:
recipients.append(email)
return recipients
return list(filter(bool, {email.strip() for email in emails}))
def check_send_stream_message(sender: UserProfile, client: Client, stream_name: str,
topic: str, body: str, realm: Optional[Realm]=None) -> int: