mirror of
https://github.com/zulip/zulip.git
synced 2025-11-23 16:01:24 +00:00
outgoing webhooks: Consolidate interfaces into lib/outgoing_webhook.py
This commit is contained in:
@@ -13,7 +13,8 @@ from requests import Response
|
||||
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from zerver.models import Realm, UserProfile, get_realm_by_email_domain, get_user_profile_by_id, get_client
|
||||
from zerver.models import Realm, UserProfile, get_realm_by_email_domain, get_user_profile_by_id, get_client, \
|
||||
GENERIC_INTERFACE, Service, SLACK_INTERFACE, email_to_domain, get_service_profile
|
||||
from zerver.lib.actions import check_send_message
|
||||
from zerver.lib.queue import queue_json_publish
|
||||
from zerver.lib.validator import check_dict, check_string
|
||||
@@ -64,6 +65,96 @@ class OutgoingWebhookServiceInterface(object):
|
||||
# type: (Response, Dict[Text, Any]) -> Optional[str]
|
||||
raise NotImplementedError()
|
||||
|
||||
class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
|
||||
|
||||
def process_event(self, event):
|
||||
# type: (Dict[Text, Any]) -> Tuple[Dict[str, Any], Any]
|
||||
rest_operation = {'method': 'POST',
|
||||
'relative_url_path': '',
|
||||
'base_url': self.base_url,
|
||||
'request_kwargs': {}}
|
||||
request_data = {"data": event['command'],
|
||||
"message": event['message'],
|
||||
"token": self.token}
|
||||
return rest_operation, json.dumps(request_data)
|
||||
|
||||
def process_success(self, response, event):
|
||||
# type: (Response, Dict[Text, Any]) -> Optional[str]
|
||||
response_json = json.loads(response.text)
|
||||
|
||||
if "response_not_required" in response_json and response_json['response_not_required']:
|
||||
return None
|
||||
if "response_string" in response_json:
|
||||
return str(response_json['response_string'])
|
||||
else:
|
||||
return ""
|
||||
|
||||
def process_failure(self, response, event):
|
||||
# type: (Response, Dict[Text, Any]) -> Optional[str]
|
||||
return str(response.text)
|
||||
|
||||
class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
|
||||
|
||||
def process_event(self, event):
|
||||
# type: (Dict[Text, Any]) -> Tuple[Dict[str, Any], Any]
|
||||
rest_operation = {'method': 'POST',
|
||||
'relative_url_path': '',
|
||||
'base_url': self.base_url,
|
||||
'request_kwargs': {}}
|
||||
|
||||
if event['message']['type'] == 'private':
|
||||
raise NotImplementedError("Private messaging service not supported.")
|
||||
|
||||
service = get_service_profile(event['user_profile_id'], str(self.service_name))
|
||||
request_data = [("token", self.token),
|
||||
("team_id", event['message']['sender_realm_str']),
|
||||
("team_domain", email_to_domain(event['message']['sender_email'])),
|
||||
("channel_id", event['message']['stream_id']),
|
||||
("channel_name", event['message']['display_recipient']),
|
||||
("timestamp", event['message']['timestamp']),
|
||||
("user_id", event['message']['sender_id']),
|
||||
("user_name", event['message']['sender_full_name']),
|
||||
("text", event['command']),
|
||||
("trigger_word", event['trigger']),
|
||||
("service_id", service.id),
|
||||
]
|
||||
|
||||
return rest_operation, request_data
|
||||
|
||||
def process_success(self, response, event):
|
||||
# type: (Response, Dict[Text, Any]) -> Optional[str]
|
||||
response_json = json.loads(response.text)
|
||||
response_text = ""
|
||||
if "text" in response_json:
|
||||
response_text = response_json["text"]
|
||||
return response_text
|
||||
|
||||
def process_failure(self, response, event):
|
||||
# type: (Response, Dict[Text, Any]) -> Optional[str]
|
||||
return str(response.text)
|
||||
|
||||
AVAILABLE_OUTGOING_WEBHOOK_INTERFACES = {
|
||||
GENERIC_INTERFACE: GenericOutgoingWebhookService,
|
||||
SLACK_INTERFACE: SlackOutgoingWebhookService,
|
||||
} # type: Dict[Text, Any]
|
||||
|
||||
def get_service_interface_class(interface):
|
||||
# type: (Text) -> Any
|
||||
if interface is None or interface not in AVAILABLE_OUTGOING_WEBHOOK_INTERFACES:
|
||||
return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[GENERIC_INTERFACE]
|
||||
else:
|
||||
return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[interface]
|
||||
|
||||
def get_outgoing_webhook_service_handler(service):
|
||||
# type: (Service) -> Any
|
||||
|
||||
service_interface_class = get_service_interface_class(service.interface_name())
|
||||
service_interface = service_interface_class(base_url=service.base_url,
|
||||
token=service.token,
|
||||
user_profile=service.user_profile,
|
||||
service_name=service.name)
|
||||
return service_interface
|
||||
|
||||
def send_response_message(bot_id, message, response_message_content):
|
||||
# type: (str, Dict[str, Any], Text) -> None
|
||||
recipient_type_name = message['type']
|
||||
|
||||
Reference in New Issue
Block a user