mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-31 12:03:46 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			174 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			174 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| import os
 | |
| import sys
 | |
| from email.headerregistry import Address
 | |
| from email.utils import parseaddr
 | |
| 
 | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
 | |
| from scripts.lib.setup_path import setup_path
 | |
| 
 | |
| setup_path()
 | |
| 
 | |
| os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
 | |
| import argparse
 | |
| import secrets
 | |
| from contextlib import contextmanager
 | |
| from typing import Iterator, List, Tuple, TypedDict
 | |
| 
 | |
| import boto3.session
 | |
| import orjson
 | |
| from django.conf import settings
 | |
| from mypy_boto3_ses import SESClient
 | |
| from mypy_boto3_sns import SNSClient
 | |
| from mypy_boto3_sqs import SQSClient
 | |
| from mypy_boto3_sqs.type_defs import DeleteMessageBatchRequestEntryTypeDef
 | |
| 
 | |
| 
 | |
| class IdentityArgsDict(TypedDict, total=False):
 | |
|     default: str
 | |
|     required: bool
 | |
| 
 | |
| 
 | |
| def main() -> None:
 | |
|     session = boto3.session.Session(region_name=settings.S3_REGION)
 | |
| 
 | |
|     # Strip off the realname, if present, and extract just the domain name
 | |
|     _, from_address = parseaddr(settings.NOREPLY_EMAIL_ADDRESS)
 | |
|     from_host = Address(addr_spec=from_address).domain
 | |
| 
 | |
|     ses: SESClient = session.client("ses")
 | |
|     possible_identities = []
 | |
|     identity_paginator = ses.get_paginator("list_identities")
 | |
|     for identity_resp in identity_paginator.paginate():
 | |
|         possible_identities += identity_resp["Identities"]
 | |
| 
 | |
|     identity_args: IdentityArgsDict = {}
 | |
|     if from_host in possible_identities:
 | |
|         identity_args["default"] = from_host
 | |
|     elif from_address in possible_identities:
 | |
|         identity_args["default"] = from_address
 | |
|     else:
 | |
|         identity_args["required"] = True
 | |
| 
 | |
|     parser = argparse.ArgumentParser(description="Tail SES delivery or bounces")
 | |
|     parser.add_argument(
 | |
|         "--identity",
 | |
|         "-i",
 | |
|         choices=possible_identities,
 | |
|         help="Sending identity in SES",
 | |
|         **identity_args,
 | |
|     )
 | |
|     topic_group = parser.add_mutually_exclusive_group(required=True)
 | |
|     topic_group.add_argument("--bounces", "-b", action="store_true")
 | |
|     topic_group.add_argument("--deliveries", "-d", action="store_true")
 | |
|     topic_group.add_argument("--complaints", "-c", action="store_true")
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     sns_topic_arn = get_ses_arn(session, args)
 | |
|     with our_sqs_queue(session, sns_topic_arn) as (queue_arn, queue_url):
 | |
|         with our_sns_subscription(session, sns_topic_arn, queue_arn):
 | |
|             print_messages(session, queue_url)
 | |
| 
 | |
| 
 | |
| def get_ses_arn(session: boto3.session.Session, args: argparse.Namespace) -> str:
 | |
|     ses: SESClient = session.client("ses")
 | |
| 
 | |
|     notification_settings = ses.get_identity_notification_attributes(Identities=[args.identity])
 | |
|     settings = notification_settings["NotificationAttributes"][args.identity]
 | |
| 
 | |
|     if args.bounces:
 | |
|         return settings["BounceTopic"]
 | |
|     elif args.complaints:
 | |
|         return settings["ComplaintTopic"]
 | |
|     elif args.deliveries:
 | |
|         return settings["DeliveryTopic"]
 | |
|     raise AssertionError  # Unreachable
 | |
| 
 | |
| 
 | |
| @contextmanager
 | |
| def our_sqs_queue(session: boto3.session.Session, ses_topic_arn: str) -> Iterator[Tuple[str, str]]:
 | |
|     (_, _, _, region, account_id, topic_name) = ses_topic_arn.split(":")
 | |
| 
 | |
|     sqs: SQSClient = session.client("sqs")
 | |
|     queue_name = "tail-ses-" + secrets.token_hex(10)
 | |
|     try:
 | |
|         resp = sqs.create_queue(
 | |
|             QueueName=queue_name,
 | |
|             Attributes={
 | |
|                 "Policy": orjson.dumps(
 | |
|                     {
 | |
|                         "Version": "2012-10-17",
 | |
|                         "Id": secrets.token_hex(10),
 | |
|                         "Statement": [
 | |
|                             {
 | |
|                                 "Sid": "Sid" + secrets.token_hex(10),
 | |
|                                 "Effect": "Allow",
 | |
|                                 "Principal": {"AWS": "*"},
 | |
|                                 "Action": "SQS:SendMessage",
 | |
|                                 "Resource": f"arn:aws:sqs:{region}:{account_id}:{queue_name}",
 | |
|                                 "Condition": {"ArnEquals": {"aws:SourceArn": ses_topic_arn}},
 | |
|                             }
 | |
|                         ],
 | |
|                     }
 | |
|                 ).decode("UTF-8")
 | |
|             },
 | |
|         )
 | |
|         queue_url = resp["QueueUrl"]
 | |
|         yield f"arn:aws:sqs:{region}:{account_id}:{queue_name}", queue_url
 | |
|     finally:
 | |
|         if queue_url is not None:
 | |
|             print("Deleting temporary queue...", file=sys.stderr)
 | |
|             sqs.delete_queue(QueueUrl=queue_url)
 | |
| 
 | |
| 
 | |
| @contextmanager
 | |
| def our_sns_subscription(
 | |
|     session: boto3.session.Session, ses_topic_arn: str, queue_arn: str
 | |
| ) -> Iterator[str]:
 | |
|     sns: SNSClient = session.client("sns")
 | |
|     try:
 | |
|         resp = sns.subscribe(
 | |
|             TopicArn=ses_topic_arn,
 | |
|             Protocol="sqs",
 | |
|             Endpoint=queue_arn,
 | |
|             Attributes={"RawMessageDelivery": "false"},
 | |
|             ReturnSubscriptionArn=True,
 | |
|         )
 | |
|         subscription_arn = resp["SubscriptionArn"]
 | |
|         yield subscription_arn
 | |
|     finally:
 | |
|         if subscription_arn is not None:
 | |
|             print("Deleting temporary SNS subscription...", file=sys.stderr)
 | |
|             sns.unsubscribe(SubscriptionArn=subscription_arn)
 | |
| 
 | |
| 
 | |
| def print_messages(session: boto3.session.Session, queue_url: str) -> None:
 | |
|     sqs: SQSClient = session.client("sqs")
 | |
|     try:
 | |
|         while True:
 | |
|             resp = sqs.receive_message(
 | |
|                 QueueUrl=queue_url,
 | |
|                 MaxNumberOfMessages=10,
 | |
|                 WaitTimeSeconds=5,
 | |
|                 MessageAttributeNames=["All"],
 | |
|             )
 | |
|             messages = resp.get("Messages", [])
 | |
|             delete_list: List[DeleteMessageBatchRequestEntryTypeDef] = []
 | |
|             for m in messages:
 | |
|                 body = orjson.loads(m["Body"])
 | |
|                 body_message = orjson.loads(body["Message"])
 | |
|                 print(
 | |
|                     body["Timestamp"]
 | |
|                     + " "
 | |
|                     + orjson.dumps(body_message, option=orjson.OPT_INDENT_2).decode("utf-8")
 | |
|                 )
 | |
|                 delete_list.append({"Id": m["MessageId"], "ReceiptHandle": m["ReceiptHandle"]})
 | |
|             if delete_list:
 | |
|                 sqs.delete_message_batch(QueueUrl=queue_url, Entries=delete_list)
 | |
|     except KeyboardInterrupt:
 | |
|         pass
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |