zerver/webhooks: Change use of typing.Text to str.

This commit is contained in:
Aditya Bansal
2018-05-10 23:04:01 +05:30
committed by Tim Abbott
parent e8506b5020
commit 64ddfc6ac0
88 changed files with 277 additions and 319 deletions

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Any, Dict, Text
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from django.test import TestCase
@@ -47,7 +46,7 @@ Acme enables me to manage the flow of information quite well. I only wish I coul
content_type="application/x-www-form-urlencoded")
self.URL_TEMPLATE = original_url_template
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("appfollow", fixture_name, file_type="json")
class ConvertMarkdownTest(TestCase):

View File

@@ -1,6 +1,6 @@
# Webhooks for external integrations.
import re
from typing import Any, Dict, Text, Optional
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
@@ -25,7 +25,7 @@ def api_appfollow_webhook(request: HttpRequest, user_profile: UserProfile,
body=convert_markdown(message))
return json_success()
def convert_markdown(text: Text) -> Text:
def convert_markdown(text: str) -> str:
# Converts Slack-style markdown to Zulip format
# Implemented mainly for AppFollow messages
# Not ready for general use as some edge-cases not handled

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -129,5 +128,5 @@ class BasecampHookTests(WebhookTestCase):
expected_message = u"Tomasz created the [comment](https://3.basecamp.com/3688623/buckets/2957043/todos/427055624#__recording_427058780) of the task [New task](https://3.basecamp.com/3688623/buckets/2957043/todos/427055624)"
self._send_and_test_message('comment_created', expected_message)
def _send_and_test_message(self, fixture_name: Text, expected_message: Text) -> None:
def _send_and_test_message(self, fixture_name: str, expected_message: str) -> None:
self.send_and_test_stream_message(fixture_name, self.EXPECTED_SUBJECT, expected_message)

View File

@@ -1,6 +1,6 @@
import logging
import re
from typing import Any, Dict, Text
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
@@ -54,22 +54,22 @@ def api_basecamp_webhook(request: HttpRequest, user_profile: UserProfile,
check_send_webhook_message(request, user_profile, subject, body)
return json_success()
def get_project_name(payload: Dict[str, Any]) -> Text:
def get_project_name(payload: Dict[str, Any]) -> str:
return payload['recording']['bucket']['name']
def get_event_type(payload: Dict[str, Any]) -> Text:
def get_event_type(payload: Dict[str, Any]) -> str:
return payload['kind']
def get_event_creator(payload: Dict[str, Any]) -> Text:
def get_event_creator(payload: Dict[str, Any]) -> str:
return payload['creator']['name']
def get_subject_url(payload: Dict[str, Any]) -> Text:
def get_subject_url(payload: Dict[str, Any]) -> str:
return payload['recording']['app_url']
def get_subject_title(payload: Dict[str, Any]) -> Text:
def get_subject_title(payload: Dict[str, Any]) -> str:
return payload['recording']['title']
def get_verb(event: Text, prefix: Text) -> Text:
def get_verb(event: str, prefix: str) -> str:
verb = event.replace(prefix, '')
if verb == 'active':
return 'activated'
@@ -79,10 +79,10 @@ def get_verb(event: Text, prefix: Text) -> Text:
return "changed {} of".format(matched.group('subject'))
return verb
def get_document_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_document_body(event: str, payload: Dict[str, Any]) -> str:
return get_generic_body(event, payload, 'document_', DOCUMENT_TEMPLATE)
def get_questions_answer_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_questions_answer_body(event: str, payload: Dict[str, Any]) -> str:
verb = get_verb(event, 'question_answer_')
question = payload['recording']['parent']
@@ -94,7 +94,7 @@ def get_questions_answer_body(event: Text, payload: Dict[str, Any]) -> Text:
question_url=question['app_url']
)
def get_comment_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_comment_body(event: str, payload: Dict[str, Any]) -> str:
verb = get_verb(event, 'comment_')
task = payload['recording']['parent']
@@ -106,19 +106,19 @@ def get_comment_body(event: Text, payload: Dict[str, Any]) -> Text:
task_url=task['app_url']
)
def get_questions_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_questions_body(event: str, payload: Dict[str, Any]) -> str:
return get_generic_body(event, payload, 'question_', QUESTION_TEMPLATE)
def get_message_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_message_body(event: str, payload: Dict[str, Any]) -> str:
return get_generic_body(event, payload, 'message_', MESSAGE_TEMPLATE)
def get_todo_list_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_todo_list_body(event: str, payload: Dict[str, Any]) -> str:
return get_generic_body(event, payload, 'todolist_', TODO_LIST_TEMPLATE)
def get_todo_body(event: Text, payload: Dict[str, Any]) -> Text:
def get_todo_body(event: str, payload: Dict[str, Any]) -> str:
return get_generic_body(event, payload, 'todo_', TODO_TEMPLATE)
def get_generic_body(event: Text, payload: Dict[str, Any], prefix: Text, template: Text) -> Text:
def get_generic_body(event: str, payload: Dict[str, Any], prefix: str, template: str) -> str:
verb = get_verb(event, prefix)
return template.format(

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Dict, Text
from typing import Dict
from mock import MagicMock, patch
@@ -139,5 +139,5 @@ class BeanstalkHookTests(WebhookTestCase):
self.api_stream_message(self.TEST_USER_EMAIL, 'svn_changefile', expected_subject, expected_message,
content_type=None)
def get_body(self, fixture_name: Text) -> Dict[str, Text]:
def get_body(self, fixture_name: str) -> Dict[str, str]:
return {'payload': self.webhook_fixture_data('beanstalk', fixture_name)}

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from unittest.mock import patch
from typing import Text, Any
from typing import Any
from zerver.lib.test_classes import WebhookTestCase
class BeeminderHookTests(WebhookTestCase):
@@ -36,5 +36,5 @@ class BeeminderHookTests(WebhookTestCase):
expected_message,
content_type="application/json")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("beeminder", fixture_name, file_type="json")

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Text, Dict, Any
from typing import Dict, Any
from django.http import HttpRequest, HttpResponse
from zerver.decorator import api_key_only_webhook_view
from zerver.lib.request import REQ, has_request_variables

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Dict, Optional, Text, Union
from typing import Dict, Optional, Union
from mock import MagicMock, patch
@@ -74,5 +74,5 @@ class BitbucketHookTests(WebhookTestCase):
self.assertFalse(check_send_webhook_message_mock.called)
self.assert_json_success(result)
def get_body(self, fixture_name: Text) -> Union[Text, Dict[str, Text]]:
def get_body(self, fixture_name: str) -> Union[str, Dict[str, str]]:
return self.webhook_fixture_data(self.FIXTURE_DIR_NAME, fixture_name)

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Optional, Text
from typing import Optional
from mock import MagicMock, patch

View File

@@ -1,6 +1,6 @@
# Webhooks for external integrations.
from typing import Any, Dict, Text
from typing import Any, Dict
import ujson
from django.http import HttpRequest, HttpResponse
@@ -27,10 +27,10 @@ def api_circleci_webhook(request: HttpRequest, user_profile: UserProfile,
check_send_webhook_message(request, user_profile, subject, body)
return json_success()
def get_subject(payload: Dict[str, Any]) -> Text:
def get_subject(payload: Dict[str, Any]) -> str:
return CIRCLECI_SUBJECT_TEMPLATE.format(repository_name=payload['reponame'])
def get_body(payload: Dict[str, Any]) -> Text:
def get_body(payload: Dict[str, Any]) -> str:
data = {
'build_url': payload['build_url'],
'username': payload['username'],
@@ -39,7 +39,7 @@ def get_body(payload: Dict[str, Any]) -> Text:
}
return CIRCLECI_MESSAGE_TEMPLATE.format(**data)
def get_status(payload: Dict[str, Any]) -> Text:
def get_status(payload: Dict[str, Any]) -> str:
status = payload['status']
if payload['previous'] and payload['previous']['status'] == FAILED_STATUS and status == FAILED_STATUS:
return u'is still failing'

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Any, Dict, Text
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -31,5 +30,5 @@ class DelightedHookTests(WebhookTestCase):
expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("delighted", fixture_name, file_type="json")

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -55,5 +54,5 @@ class DeskDotComHookTests(WebhookTestCase):
self.api_stream_message(self.TEST_USER_EMAIL, 'unicode_text_japanese', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("deskdotcom", fixture_name, file_type="txt")

View File

@@ -1,5 +1,4 @@
# Webhooks for external integrations.
from typing import Text
from django.http import HttpRequest, HttpResponse
@@ -17,7 +16,7 @@ from zerver.models import UserProfile, get_client
@authenticated_rest_api_view(webhook_client_name="Desk")
@has_request_variables
def api_deskdotcom_webhook(request: HttpRequest, user_profile: UserProfile,
data: Text=REQ()) -> HttpResponse:
data: str=REQ()) -> HttpResponse:
topic = "Desk.com notification"
check_send_webhook_message(request, user_profile, topic, data)
return json_success()

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class DialogflowHookTests(WebhookTestCase):
@@ -60,7 +59,7 @@ class DialogflowHookTests(WebhookTestCase):
expected_message,
content_type="application/json")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("dialogflow",
fixture_name,
file_type="json")

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Text, Any, Dict
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
from zerver.decorator import api_key_only_webhook_view
from zerver.lib.actions import check_send_private_message

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -15,7 +14,7 @@ class DropboxHookTests(WebhookTestCase):
self.send_and_test_stream_message('file_updated', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("dropbox", fixture_name, file_type="json")
def test_verification_request(self) -> None:

View File

@@ -1,4 +1,3 @@
from typing import Text
from django.http import HttpRequest, HttpResponse
from zerver.lib.response import json_success
from zerver.lib.webhooks.common import check_send_webhook_message

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class FlockHookTests(WebhookTestCase):
@@ -86,5 +85,5 @@ class FlockHookTests(WebhookTestCase):
expected_message,
content_type="application/json")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("flock", fixture_name, file_type="json")

View File

@@ -4,7 +4,7 @@ from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.decorator import REQ, has_request_variables, api_key_only_webhook_view
from zerver.models import UserProfile
from django.http import HttpRequest, HttpResponse
from typing import Dict, Any, Text
from typing import Dict, Any
CHECK_IS_REPLY = "in reply to"

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -49,7 +48,7 @@ Priority: **High** => **Low**"""
self.api_stream_message(self.TEST_USER_EMAIL, 'priority_changed', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def note_change(self, fixture: Text, note_type: Text) -> None:
def note_change(self, fixture: str, note_type: str) -> None:
"""
Messages are generated when a note gets added to a ticket through
Freshdesk's "Observer" service.
@@ -76,5 +75,5 @@ Priority: **High** => **Low**"""
self.api_stream_message(self.TEST_USER_EMAIL, "inline_images", expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("freshdesk", fixture_name, file_type="json")

View File

@@ -1,7 +1,7 @@
"""Webhooks for external integrations."""
import logging
from typing import Any, Dict, List, Optional, Text, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
import ujson
from django.http import HttpRequest, HttpResponse

View File

@@ -1,4 +1,3 @@
from typing import Text
import ujson
from zerver.lib.test_classes import WebhookTestCase
@@ -179,5 +178,5 @@ class FrontHookTests(WebhookTestCase):
self.assert_json_error(result, "Unknown webhook request")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data('front', fixture_name, file_type="json")

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Text, Tuple
from typing import Any, Dict, Optional, Tuple
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
@@ -9,47 +9,47 @@ from zerver.lib.response import json_error, json_success
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.models import UserProfile
def get_message_data(payload: Dict[Text, Any]) -> Tuple[Text, Text, Text, Text]:
def get_message_data(payload: Dict[str, Any]) -> Tuple[str, str, str, str]:
link = "https://app.frontapp.com/open/" + payload['target']['data']['id']
outbox = payload['conversation']['recipient']['handle']
inbox = payload['source']['data'][0]['address']
subject = payload['conversation']['subject']
return link, outbox, inbox, subject
def get_source_name(payload: Dict[Text, Any]) -> Text:
def get_source_name(payload: Dict[str, Any]) -> str:
first_name = payload['source']['data']['first_name']
last_name = payload['source']['data']['last_name']
return "%s %s" % (first_name, last_name)
def get_target_name(payload: Dict[Text, Any]) -> Text:
def get_target_name(payload: Dict[str, Any]) -> str:
first_name = payload['target']['data']['first_name']
last_name = payload['target']['data']['last_name']
return "%s %s" % (first_name, last_name)
def get_inbound_message_body(payload: Dict[Text, Any]) -> Text:
def get_inbound_message_body(payload: Dict[str, Any]) -> str:
link, outbox, inbox, subject = get_message_data(payload)
return "[Inbound message]({link}) from **{outbox}** to **{inbox}**.\n" \
"```quote\n*Subject*: {subject}\n```" \
.format(link=link, outbox=outbox, inbox=inbox, subject=subject)
def get_outbound_message_body(payload: Dict[Text, Any]) -> Text:
def get_outbound_message_body(payload: Dict[str, Any]) -> str:
link, outbox, inbox, subject = get_message_data(payload)
return "[Outbound message]({link}) from **{inbox}** to **{outbox}**.\n" \
"```quote\n*Subject*: {subject}\n```" \
.format(link=link, inbox=inbox, outbox=outbox, subject=subject)
def get_outbound_reply_body(payload: Dict[Text, Any]) -> Text:
def get_outbound_reply_body(payload: Dict[str, Any]) -> str:
link, outbox, inbox, subject = get_message_data(payload)
return "[Outbound reply]({link}) from **{inbox}** to **{outbox}**." \
.format(link=link, inbox=inbox, outbox=outbox)
def get_comment_body(payload: Dict[Text, Any]) -> Text:
def get_comment_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
comment = payload['target']['data']['body']
return "**{name}** left a comment:\n```quote\n{comment}\n```" \
.format(name=name, comment=comment)
def get_conversation_assigned_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_assigned_body(payload: Dict[str, Any]) -> str:
source_name = get_source_name(payload)
target_name = get_target_name(payload)
@@ -60,32 +60,32 @@ def get_conversation_assigned_body(payload: Dict[Text, Any]) -> Text:
return "**{source_name}** assigned **{target_name}**." \
.format(source_name=source_name, target_name=target_name)
def get_conversation_unassigned_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_unassigned_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
return "Unassined by **{name}**.".format(name=name)
def get_conversation_archived_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_archived_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
return "Archived by **{name}**.".format(name=name)
def get_conversation_reopened_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_reopened_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
return "Reopened by **{name}**.".format(name=name)
def get_conversation_deleted_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_deleted_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
return "Deleted by **{name}**.".format(name=name)
def get_conversation_restored_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_restored_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
return "Restored by **{name}**.".format(name=name)
def get_conversation_tagged_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_tagged_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
tag = payload['target']['data']['name']
return "**{name}** added tag **{tag}**.".format(name=name, tag=tag)
def get_conversation_untagged_body(payload: Dict[Text, Any]) -> Text:
def get_conversation_untagged_body(payload: Dict[str, Any]) -> str:
name = get_source_name(payload)
tag = payload['target']['data']['name']
return "**{name}** removed tag **{tag}**.".format(name=name, tag=tag)
@@ -106,13 +106,13 @@ EVENT_FUNCTION_MAPPER = {
'untag': get_conversation_untagged_body
}
def get_body_based_on_event(event: Text) -> Any:
def get_body_based_on_event(event: str) -> Any:
return EVENT_FUNCTION_MAPPER[event]
@api_key_only_webhook_view('Front')
@has_request_variables
def api_front_webhook(request: HttpRequest, user_profile: UserProfile,
payload: Dict[Text, Any]=REQ(argument_type='body')) -> HttpResponse:
payload: Dict[str, Any]=REQ(argument_type='body')) -> HttpResponse:
event = payload['type']
if event not in EVENT_FUNCTION_MAPPER:

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Optional, Text
from typing import Optional
from zerver.lib.test_classes import WebhookTestCase

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
@@ -18,7 +18,7 @@ def build_instance_url(instance_id: str) -> str:
class UnknownEventType(Exception):
pass
def get_abandon_event_body(payload: Dict[Text, Any]) -> Text:
def get_abandon_event_body(payload: Dict[str, Any]) -> str:
return GCI_MESSAGE_TEMPLATE.format(
actor=payload['task_claimed_by'],
action='{}ed'.format(payload['event_type']),
@@ -26,7 +26,7 @@ def get_abandon_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_submit_event_body(payload: Dict[Text, Any]) -> Text:
def get_submit_event_body(payload: Dict[str, Any]) -> str:
return GCI_MESSAGE_TEMPLATE.format(
actor=payload['task_claimed_by'],
action='{}ted'.format(payload['event_type']),
@@ -34,7 +34,7 @@ def get_submit_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_comment_event_body(payload: Dict[Text, Any]) -> Text:
def get_comment_event_body(payload: Dict[str, Any]) -> str:
return GCI_MESSAGE_TEMPLATE.format(
actor=payload['author'],
action='{}ed on'.format(payload['event_type']),
@@ -42,7 +42,7 @@ def get_comment_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_claim_event_body(payload: Dict[Text, Any]) -> Text:
def get_claim_event_body(payload: Dict[str, Any]) -> str:
return GCI_MESSAGE_TEMPLATE.format(
actor=payload['task_claimed_by'],
action='{}ed'.format(payload['event_type']),
@@ -50,7 +50,7 @@ def get_claim_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_approve_event_body(payload: Dict[Text, Any]) -> Text:
def get_approve_event_body(payload: Dict[str, Any]) -> str:
return GCI_MESSAGE_TEMPLATE.format(
actor=payload['author'],
action='{}d'.format(payload['event_type']),
@@ -58,7 +58,7 @@ def get_approve_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_approve_pending_pc_event_body(payload: Dict[Text, Any]) -> Text:
def get_approve_pending_pc_event_body(payload: Dict[str, Any]) -> str:
template = "{} (pending parental consent).".format(GCI_MESSAGE_TEMPLATE.rstrip('.'))
return template.format(
actor=payload['author'],
@@ -67,7 +67,7 @@ def get_approve_pending_pc_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_needswork_event_body(payload: Dict[Text, Any]) -> Text:
def get_needswork_event_body(payload: Dict[str, Any]) -> str:
template = "{} for more work.".format(GCI_MESSAGE_TEMPLATE.rstrip('.'))
return template.format(
actor=payload['author'],
@@ -76,7 +76,7 @@ def get_needswork_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_extend_event_body(payload: Dict[Text, Any]) -> Text:
def get_extend_event_body(payload: Dict[str, Any]) -> str:
template = "{} by {days} day(s).".format(GCI_MESSAGE_TEMPLATE.rstrip('.'),
days=payload['extension_days'])
return template.format(
@@ -86,7 +86,7 @@ def get_extend_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_unassign_event_body(payload: Dict[Text, Any]) -> Text:
def get_unassign_event_body(payload: Dict[str, Any]) -> str:
return GCI_MESSAGE_TEMPLATE.format(
actor=payload['author'],
action='unassigned **{student}** from'.format(student=payload['task_claimed_by']),
@@ -94,7 +94,7 @@ def get_unassign_event_body(payload: Dict[Text, Any]) -> Text:
task_url=build_instance_url(payload['task_instance']),
)
def get_outoftime_event_body(payload: Dict[Text, Any]) -> Text:
def get_outoftime_event_body(payload: Dict[str, Any]) -> str:
return u'The deadline for the task [{task_name}]({task_url}) has passed.'.format(
task_name=payload['task_definition_name'],
task_url=build_instance_url(payload['task_instance']),
@@ -103,7 +103,7 @@ def get_outoftime_event_body(payload: Dict[Text, Any]) -> Text:
@api_key_only_webhook_view("Google-Code-In")
@has_request_variables
def api_gci_webhook(request: HttpRequest, user_profile: UserProfile,
payload: Dict[Text, Any]=REQ(argument_type='body')) -> HttpResponse:
payload: Dict[str, Any]=REQ(argument_type='body')) -> HttpResponse:
event = get_event(payload)
if event is not None:
body = get_body_based_on_event(event)(payload)
@@ -127,12 +127,12 @@ EVENTS_FUNCTION_MAPPER = {
'unassign': get_unassign_event_body,
}
def get_event(payload: Dict[Text, Any]) -> Optional[Text]:
def get_event(payload: Dict[str, Any]) -> Optional[str]:
event = payload['event_type']
if event in EVENTS_FUNCTION_MAPPER:
return event
raise UnknownEventType(u"Event '{}' is unknown and cannot be handled".format(event)) # nocoverage
def get_body_based_on_event(event: Text) -> Any:
def get_body_based_on_event(event: str) -> Any:
return EVENTS_FUNCTION_MAPPER[event]

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional, Text
from typing import Dict, Optional
import ujson
from mock import MagicMock, patch

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional, Text
from typing import Dict, Optional
import ujson
@@ -7,11 +7,11 @@ from zerver.lib.webhooks.git import COMMITS_LIMIT
from zerver.models import Message
class GithubV1HookTests(WebhookTestCase):
STREAM_NAME = None # type: Optional[Text]
STREAM_NAME = None # type: Optional[str]
URL_TEMPLATE = u"/api/v1/external/github"
FIXTURE_DIR_NAME = 'github_legacy'
SEND_STREAM = False
BRANCHES = None # type: Optional[Text]
BRANCHES = None # type: Optional[str]
push_content = u"""zbenjamin [pushed](https://github.com/zbenjamin/zulip-test/compare/4f9adc4777d5...b95449196980) 3 commits to branch master.
@@ -38,7 +38,7 @@ class GithubV1HookTests(WebhookTestCase):
after_count = Message.objects.count()
self.assertEqual(prior_count, after_count)
def get_body(self, fixture_name: Text) -> Dict[str, Text]:
def get_body(self, fixture_name: str) -> Dict[str, str]:
api_key = self.test_user.api_key
data = ujson.loads(self.webhook_fixture_data(self.FIXTURE_DIR_NAME, 'v1_' + fixture_name))
data.update({'email': self.TEST_USER_EMAIL,
@@ -51,9 +51,9 @@ class GithubV1HookTests(WebhookTestCase):
data['branches'] = self.BRANCHES
return data
def basic_test(self, fixture_name: Text, stream_name: Text,
expected_subject: Text, expected_content: Text,
send_stream: bool=False, branches: Optional[Text]=None) -> None:
def basic_test(self, fixture_name: str, stream_name: str,
expected_subject: str, expected_content: str,
send_stream: bool=False, branches: Optional[str]=None) -> None:
self.STREAM_NAME = stream_name
self.SEND_STREAM = send_stream
self.BRANCHES = branches
@@ -132,11 +132,11 @@ class GithubV1HookTests(WebhookTestCase):
"zbenjamin [commented](https://github.com/zbenjamin/zulip-test/commit/7c994678d2f98797d299abed852d3ff9d0834533#commitcomment-4252307) on [7c99467](https://github.com/zbenjamin/zulip-test/commit/7c994678d2f98797d299abed852d3ff9d0834533)\n~~~ quote\nThis line adds /unlucky/ cowbell (because of its line number). We should remove it.\n~~~")
class GithubV2HookTests(WebhookTestCase):
STREAM_NAME = None # type: Optional[Text]
STREAM_NAME = None # type: Optional[str]
URL_TEMPLATE = u"/api/v1/external/github"
FIXTURE_DIR_NAME = 'github_legacy'
SEND_STREAM = False
BRANCHES = None # type: Optional[Text]
BRANCHES = None # type: Optional[str]
push_content = """zbenjamin [pushed](https://github.com/zbenjamin/zulip-test/compare/4f9adc4777d5...b95449196980) 3 commits to branch master.
@@ -163,7 +163,7 @@ class GithubV2HookTests(WebhookTestCase):
after_count = Message.objects.count()
self.assertEqual(prior_count, after_count)
def get_body(self, fixture_name: Text) -> Dict[str, Text]:
def get_body(self, fixture_name: str) -> Dict[str, str]:
api_key = self.test_user.api_key
data = ujson.loads(self.webhook_fixture_data(self.FIXTURE_DIR_NAME, 'v2_' + fixture_name))
data.update({'email': self.TEST_USER_EMAIL,
@@ -176,9 +176,9 @@ class GithubV2HookTests(WebhookTestCase):
data['branches'] = self.BRANCHES
return data
def basic_test(self, fixture_name: Text, stream_name: Text,
expected_subject: Text, expected_content: Text,
send_stream: bool=False, branches: Optional[Text]=None) -> None:
def basic_test(self, fixture_name: str, stream_name: str,
expected_subject: str, expected_content: str,
send_stream: bool=False, branches: Optional[str]=None) -> None:
self.STREAM_NAME = stream_name
self.SEND_STREAM = send_stream
self.BRANCHES = branches

View File

@@ -1,6 +1,6 @@
import logging
import re
from typing import Any, Dict, List, Mapping, Optional, Sequence, Text, Tuple
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple
import ujson
from django.conf import settings
@@ -22,20 +22,20 @@ from zerver.views.messages import send_message_backend
ZULIP_TEST_REPO_NAME = 'zulip-test'
ZULIP_TEST_REPO_ID = 6893087
def flexible_boolean(boolean: Text) -> bool:
def flexible_boolean(boolean: str) -> bool:
"""Returns True for any of "1", "true", or "True". Returns False otherwise."""
if boolean in ("1", "true", "True"):
return True
else:
return False
def is_test_repository(repository: Mapping[Text, Any]) -> bool:
def is_test_repository(repository: Mapping[str, Any]) -> bool:
return repository['name'] == ZULIP_TEST_REPO_NAME and repository['id'] == ZULIP_TEST_REPO_ID
class UnknownEventType(Exception):
pass
def github_pull_request_content(payload: Mapping[Text, Any]) -> Text:
def github_pull_request_content(payload: Mapping[str, Any]) -> str:
pull_request = payload['pull_request']
action = get_pull_request_or_issue_action(payload)
@@ -57,7 +57,7 @@ def github_pull_request_content(payload: Mapping[Text, Any]) -> Text:
pull_request['number']
)
def github_issues_content(payload: Mapping[Text, Any]) -> Text:
def github_issues_content(payload: Mapping[str, Any]) -> str:
issue = payload['issue']
action = get_pull_request_or_issue_action(payload)
@@ -77,7 +77,7 @@ def github_issues_content(payload: Mapping[Text, Any]) -> Text:
issue['number'],
)
def github_object_commented_content(payload: Mapping[Text, Any], type: Text) -> Text:
def github_object_commented_content(payload: Mapping[str, Any], type: str) -> str:
comment = payload['comment']
issue = payload['issue']
action = u'[commented]({}) on'.format(comment['html_url'])
@@ -91,18 +91,18 @@ def github_object_commented_content(payload: Mapping[Text, Any], type: Text) ->
type=type
)
def get_pull_request_or_issue_action(payload: Mapping[Text, Any]) -> Text:
def get_pull_request_or_issue_action(payload: Mapping[str, Any]) -> str:
return 'synchronized' if payload['action'] == 'synchronize' else payload['action']
def get_pull_request_or_issue_assignee(object_payload: Mapping[Text, Any]) -> Optional[Text]:
def get_pull_request_or_issue_assignee(object_payload: Mapping[str, Any]) -> Optional[str]:
assignee_dict = object_payload.get('assignee')
if assignee_dict:
return assignee_dict.get('login')
return None
def get_pull_request_or_issue_subject(repository: Mapping[Text, Any],
payload_object: Mapping[Text, Any],
type: Text) -> Text:
def get_pull_request_or_issue_subject(repository: Mapping[str, Any],
payload_object: Mapping[str, Any],
type: str) -> str:
return SUBJECT_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=repository['name'],
type=type,
@@ -110,16 +110,16 @@ def get_pull_request_or_issue_subject(repository: Mapping[Text, Any],
title=payload_object['title']
)
def github_generic_subject(noun: Text, topic_focus: Text, blob: Mapping[Text, Any]) -> Text:
def github_generic_subject(noun: str, topic_focus: str, blob: Mapping[str, Any]) -> str:
# issue and pull_request objects have the same fields we're interested in
return u'%s: %s %d: %s' % (topic_focus, noun, blob['number'], blob['title'])
def api_github_v1(user_profile: UserProfile,
event: Text,
payload: Mapping[Text, Any],
branches: Text,
stream: Text,
**kwargs: Any) -> Tuple[Text, Text, Text]:
event: str,
payload: Mapping[str, Any],
branches: str,
stream: str,
**kwargs: Any) -> Tuple[str, str, str]:
"""
processes github payload with version 1 field specification
`payload` comes in unmodified from github
@@ -131,9 +131,9 @@ def api_github_v1(user_profile: UserProfile,
stream, commit_stream, issue_stream, **kwargs)
def api_github_v2(user_profile: UserProfile, event: Text, payload: Mapping[Text, Any],
branches: Text, default_stream: Text, commit_stream: Text,
issue_stream: Text, topic_focus: Optional[Text]=None) -> Tuple[Text, Text, Text]:
def api_github_v2(user_profile: UserProfile, event: str, payload: Mapping[str, Any],
branches: str, default_stream: str, commit_stream: str,
issue_stream: str, topic_focus: Optional[str]=None) -> Tuple[str, str, str]:
"""
processes github payload with version 2 field specification
`payload` comes in unmodified from github
@@ -201,13 +201,13 @@ def api_github_v2(user_profile: UserProfile, event: Text, payload: Mapping[Text,
@authenticated_api_view(is_webhook=True)
@has_request_variables
def api_github_landing(request: HttpRequest, user_profile: UserProfile, event: Text=REQ(),
payload: Mapping[Text, Any]=REQ(validator=check_dict([])),
branches: Text=REQ(default=''),
stream: Text=REQ(default=''),
def api_github_landing(request: HttpRequest, user_profile: UserProfile, event: str=REQ(),
payload: Mapping[str, Any]=REQ(validator=check_dict([])),
branches: str=REQ(default=''),
stream: str=REQ(default=''),
version: int=REQ(converter=to_non_negative_int, default=1),
commit_stream: Text=REQ(default=''),
issue_stream: Text=REQ(default=''),
commit_stream: str=REQ(default=''),
issue_stream: str=REQ(default=''),
exclude_pull_requests: bool=REQ(converter=flexible_boolean, default=False),
exclude_issues: bool=REQ(converter=flexible_boolean, default=False),
exclude_commits: bool=REQ(converter=flexible_boolean, default=False),
@@ -286,11 +286,11 @@ def api_github_landing(request: HttpRequest, user_profile: UserProfile, event: T
forged=False, topic_name=subject,
message_content=content)
def build_message_from_gitlog(user_profile: UserProfile, name: Text, ref: Text,
commits: List[Dict[str, str]], before: Text, after: Text,
url: Text, pusher: Text, forced: Optional[Text]=None,
created: Optional[Text]=None, deleted: Optional[bool]=False
) -> Tuple[Text, Text]:
def build_message_from_gitlog(user_profile: UserProfile, name: str, ref: str,
commits: List[Dict[str, str]], before: str, after: str,
url: str, pusher: str, forced: Optional[str]=None,
created: Optional[str]=None, deleted: Optional[bool]=False
) -> Tuple[str, str]:
short_ref = re.sub(r'^refs/heads/', '', ref)
subject = SUBJECT_WITH_BRANCH_TEMPLATE.format(repo=name, branch=short_ref)

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Optional, Text
from typing import Optional
from mock import MagicMock, patch

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
import urllib
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
from zerver.models import get_realm, get_user
@@ -39,5 +38,5 @@ class GocdHookTests(WebhookTestCase):
content_type="application/x-www-form-urlencoded"
)
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("gocd", fixture_name, file_type="json")

View File

@@ -3,7 +3,7 @@
import json
import os
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Optional, Text
from typing import Optional
from mock import MagicMock, patch

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -16,5 +15,5 @@ class GoSquaredHookTests(WebhookTestCase):
self.send_and_test_stream_message('traffic_spike', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("gosquared", fixture_name, file_type="json")

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -62,5 +61,5 @@ class GreenhouseHookTests(WebhookTestCase):
expected_message,
content_type=self.CONTENT_TYPE)
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("greenhouse", fixture_name, file_type="json")

View File

@@ -1,6 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class GrooveHookTests(WebhookTestCase):
@@ -115,5 +113,5 @@ class GrooveHookTests(WebhookTestCase):
HTTP_X_GROOVE_EVENT='ticket_started')
self.assert_json_error(result, 'Missing required data')
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("groove", fixture_name, file_type="json")

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
@@ -13,12 +13,12 @@ from zerver.lib.webhooks.common import check_send_webhook_message, \
validate_extract_webhook_http_header
from zerver.models import UserProfile
def ticket_started_body(payload: Dict[str, Any]) -> Text:
def ticket_started_body(payload: Dict[str, Any]) -> str:
body = u'New ticket from {customer_name}'
body += u"\n```quote\n**[Ticket #{number}: {title}]({app_url})**\n{summary}\n```"
return body.format(**payload)
def ticket_assigned_body(payload: Dict[str, Any]) -> Optional[Text]:
def ticket_assigned_body(payload: Dict[str, Any]) -> Optional[str]:
# Take the state, assignee, and assigned group from the payload.
state = payload['state']
assignee = payload['assignee']
@@ -46,7 +46,7 @@ def ticket_assigned_body(payload: Dict[str, Any]) -> Optional[Text]:
else:
return None
def agent_replied_body(payload: Dict[str, Any]) -> Text:
def agent_replied_body(payload: Dict[str, Any]) -> str:
# Take the agent's email and the ticket number from the payload.
agent = payload['links']['author']['href'].split("http://api.groovehq.com/v1/agents/")[1]
number = payload['links']['ticket']['href'].split("http://api.groovehq.com/v1/tickets/")[1]
@@ -56,7 +56,7 @@ def agent_replied_body(payload: Dict[str, Any]) -> Text:
body += u"({app_ticket_url})**\n{plain_text_body}\n```"
return body.format(**payload)
def customer_replied_body(payload: Dict[str, Any]) -> Text:
def customer_replied_body(payload: Dict[str, Any]) -> str:
# Take the customer's email and the ticket number from the payload.
customer = payload['links']['author']['href'].split("http://api.groovehq.com/v1/customers/")[1]
number = payload['links']['ticket']['href'].split("http://api.groovehq.com/v1/tickets/")[1]
@@ -66,7 +66,7 @@ def customer_replied_body(payload: Dict[str, Any]) -> Text:
body += u"({app_ticket_url})**\n{plain_text_body}\n```"
return body.format(**payload)
def note_added_body(payload: Dict[str, Any]) -> Text:
def note_added_body(payload: Dict[str, Any]) -> str:
# Take the agent's email and the ticket number from the payload.
agent = payload['links']['author']['href'].split("http://api.groovehq.com/v1/agents/")[1]
number = payload['links']['ticket']['href'].split("http://api.groovehq.com/v1/tickets/")[1]

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -23,5 +22,5 @@ class HelloSignHookTests(WebhookTestCase):
self.send_and_test_stream_message('signatures_with_own_subject', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded", topic=expected_subject)
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("hellosign", fixture_name, file_type="json")

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
from django.conf import settings
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
from zerver.models import get_system_bot
@@ -56,5 +55,5 @@ class HelloWorldHookTests(WebhookTestCase):
self.send_and_test_stream_message('goodbye', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("helloworld", fixture_name, file_type="json")

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -15,5 +14,5 @@ class HerokuHookTests(WebhookTestCase):
self.send_and_test_stream_message('deploy', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("heroku", fixture_name, file_type="txt")

View File

@@ -1,5 +1,4 @@
# Webhooks for external integrations.
from typing import Text
from django.http import HttpRequest, HttpResponse
@@ -12,8 +11,8 @@ from zerver.models import UserProfile
@api_key_only_webhook_view("Heroku")
@has_request_variables
def api_heroku_webhook(request: HttpRequest, user_profile: UserProfile,
head: Text=REQ(), app: Text=REQ(), user: Text=REQ(),
url: Text=REQ(), git_log: Text=REQ()) -> HttpResponse:
head: str=REQ(), app: str=REQ(), user: str=REQ(),
url: str=REQ(), git_log: str=REQ()) -> HttpResponse:
template = "{} deployed version {} of [{}]({})\n> {}"
content = template.format(user, head, app, url, git_log)

View File

@@ -1,4 +1,3 @@
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -21,5 +20,5 @@ class HomeAssistantHookTests(WebhookTestCase):
self.send_and_test_stream_message('reqwithtitle', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("homeassistant", fixture_name, file_type="json")

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -31,5 +30,5 @@ class InspingHookTests(WebhookTestCase):
expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("insping", fixture_name, file_type="json")

View File

@@ -8,7 +8,7 @@ from zerver.lib.validator import check_dict, check_string
from zerver.models import Client, UserProfile
from django.http import HttpRequest, HttpResponse
from typing import Dict, Any, Iterable, Optional, Text
from typing import Dict, Any, Iterable, Optional
import time

View File

@@ -1,5 +1,3 @@
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class IntercomWebHookTests(WebhookTestCase):
@@ -17,5 +15,5 @@ class IntercomWebHookTests(WebhookTestCase):
self.send_and_test_stream_message('user_created', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data('intercom', fixture_name, file_type="json")

View File

@@ -1,5 +1,5 @@
import datetime
from typing import Any, Dict, Text
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -119,5 +118,5 @@ Adding a comment. Oh, what a comment it is!"""
self.send_and_test_stream_message('change_status_v1', expected_subject, expected_message)
self.send_and_test_stream_message('change_status_v2', expected_subject, expected_message)
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data('jira', fixture_name)

View File

@@ -1,7 +1,7 @@
# Webhooks for external integrations.
import logging
import re
from typing import Any, Dict, List, Optional, Text, Tuple
from typing import Any, Dict, List, Optional, Tuple
import ujson
from django.conf import settings
@@ -21,7 +21,7 @@ IGNORED_EVENTS = [
'comment_deleted', # we handle issue_update event instead
]
def guess_zulip_user_from_jira(jira_username: Text, realm: Realm) -> Optional[UserProfile]:
def guess_zulip_user_from_jira(jira_username: str, realm: Realm) -> Optional[UserProfile]:
try:
# Try to find a matching user in Zulip
# We search a user's full name, short name,
@@ -36,7 +36,7 @@ def guess_zulip_user_from_jira(jira_username: Text, realm: Realm) -> Optional[Us
except IndexError:
return None
def convert_jira_markup(content: Text, realm: Realm) -> Text:
def convert_jira_markup(content: str, realm: Realm) -> str:
# Attempt to do some simplistic conversion of JIRA
# formatting to Markdown, for consumption in Zulip
@@ -87,7 +87,7 @@ def convert_jira_markup(content: Text, realm: Realm) -> Text:
return content
def get_in(payload: Dict[str, Any], keys: List[str], default: Text='') -> Any:
def get_in(payload: Dict[str, Any], keys: List[str], default: str='') -> Any:
try:
for key in keys:
payload = payload[key]
@@ -95,7 +95,7 @@ def get_in(payload: Dict[str, Any], keys: List[str], default: Text='') -> Any:
return default
return payload
def get_issue_string(payload: Dict[str, Any], issue_id: Optional[Text]=None) -> Text:
def get_issue_string(payload: Dict[str, Any], issue_id: Optional[str]=None) -> str:
# Guess the URL as it is not specified in the payload
# We assume that there is a /browse/BUG-### page
# from the REST url of the issue itself
@@ -108,7 +108,7 @@ def get_issue_string(payload: Dict[str, Any], issue_id: Optional[Text]=None) ->
else:
return issue_id
def get_assignee_mention(assignee_email: Text, realm: Realm) -> Text:
def get_assignee_mention(assignee_email: str, realm: Realm) -> str:
if assignee_email != '':
try:
assignee_name = get_user(assignee_email, realm).full_name
@@ -117,19 +117,19 @@ def get_assignee_mention(assignee_email: Text, realm: Realm) -> Text:
return u"**{}**".format(assignee_name)
return ''
def get_issue_author(payload: Dict[str, Any]) -> Text:
def get_issue_author(payload: Dict[str, Any]) -> str:
return get_in(payload, ['user', 'displayName'])
def get_issue_id(payload: Dict[str, Any]) -> Text:
def get_issue_id(payload: Dict[str, Any]) -> str:
return get_in(payload, ['issue', 'key'])
def get_issue_title(payload: Dict[str, Any]) -> Text:
def get_issue_title(payload: Dict[str, Any]) -> str:
return get_in(payload, ['issue', 'fields', 'summary'])
def get_issue_subject(payload: Dict[str, Any]) -> Text:
def get_issue_subject(payload: Dict[str, Any]) -> str:
return u"{}: {}".format(get_issue_id(payload), get_issue_title(payload))
def get_sub_event_for_update_issue(payload: Dict[str, Any]) -> Text:
def get_sub_event_for_update_issue(payload: Dict[str, Any]) -> str:
sub_event = payload.get('issue_event_type_name', '')
if sub_event == '':
if payload.get('comment'):
@@ -138,13 +138,13 @@ def get_sub_event_for_update_issue(payload: Dict[str, Any]) -> Text:
return 'issue_transited'
return sub_event
def get_event_type(payload: Dict[str, Any]) -> Optional[Text]:
def get_event_type(payload: Dict[str, Any]) -> Optional[str]:
event = payload.get('webhookEvent')
if event is None and payload.get('transition'):
event = 'jira:issue_updated'
return event
def add_change_info(content: Text, field: Text, from_field: Text, to_field: Text) -> Text:
def add_change_info(content: str, field: str, from_field: str, to_field: str) -> str:
content += u"* Changed {}".format(field)
if from_field:
content += u" from **{}**".format(from_field)
@@ -152,7 +152,7 @@ def add_change_info(content: Text, field: Text, from_field: Text, to_field: Text
content += u" to {}\n".format(to_field)
return content
def handle_updated_issue_event(payload: Dict[str, Any], user_profile: UserProfile) -> Text:
def handle_updated_issue_event(payload: Dict[str, Any], user_profile: UserProfile) -> str:
# Reassigned, commented, reopened, and resolved events are all bundled
# into this one 'updated' event type, so we try to extract the meaningful
# event that happened
@@ -208,7 +208,7 @@ def handle_updated_issue_event(payload: Dict[str, Any], user_profile: UserProfil
return content
def handle_created_issue_event(payload: Dict[str, Any]) -> Text:
def handle_created_issue_event(payload: Dict[str, Any]) -> str:
return u"{} **created** {} priority {}, assigned to **{}**:\n\n> {}".format(
get_issue_author(payload),
get_issue_string(payload),
@@ -217,7 +217,7 @@ def handle_created_issue_event(payload: Dict[str, Any]) -> Text:
get_issue_title(payload)
)
def handle_deleted_issue_event(payload: Dict[str, Any]) -> Text:
def handle_deleted_issue_event(payload: Dict[str, Any]) -> str:
return u"{} **deleted** {}!".format(get_issue_author(payload), get_issue_string(payload))
@api_key_only_webhook_view("JIRA")

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import urllib
from typing import Optional, Text
from typing import Optional
from zerver.lib.test_classes import WebhookTestCase
@@ -10,7 +10,7 @@ class LibratoHookTests(WebhookTestCase):
FIXTURE_DIR_NAME = 'librato'
IS_ATTACHMENT = False
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
if self.IS_ATTACHMENT:
return self.webhook_fixture_data("librato", fixture_name, file_type='json')
return urllib.parse.urlencode({'payload': self.webhook_fixture_data("librato", fixture_name, file_type='json')})

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Text, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple
import ujson
from django.http import HttpRequest, HttpResponse
@@ -23,22 +23,22 @@ class LibratoWebhookParser:
self.payload = payload
self.attachments = attachments
def generate_alert_url(self, alert_id: int) -> Text:
def generate_alert_url(self, alert_id: int) -> str:
return self.ALERT_URL_TEMPLATE.format(alert_id=alert_id)
def parse_alert(self) -> Tuple[int, Text, Text, Text]:
def parse_alert(self) -> Tuple[int, str, str, str]:
alert = self.payload['alert']
alert_id = alert['id']
return alert_id, alert['name'], self.generate_alert_url(alert_id), alert['runbook_url']
def parse_condition(self, condition: Dict[str, Any]) -> Tuple[Text, Text, Text, Text]:
def parse_condition(self, condition: Dict[str, Any]) -> Tuple[str, str, str, str]:
summary_function = condition['summary_function']
threshold = condition.get('threshold', '')
condition_type = condition['type']
duration = condition.get('duration', '')
return summary_function, threshold, condition_type, duration
def parse_violation(self, violation: Dict[str, Any]) -> Tuple[Text, Text]:
def parse_violation(self, violation: Dict[str, Any]) -> Tuple[str, str]:
metric_name = violation['metric']
recorded_at = datetime.fromtimestamp((violation['recorded_at']),
tz=timezone_utc).strftime('%Y-%m-%d %H:%M:%S')
@@ -52,7 +52,7 @@ class LibratoWebhookParser:
violations = self.payload['violations']['test-source']
return violations
def parse_snapshot(self, snapshot: Dict[str, Any]) -> Tuple[Text, Text, Text]:
def parse_snapshot(self, snapshot: Dict[str, Any]) -> Tuple[str, str, str]:
author_name, image_url, title = snapshot['author_name'], snapshot['image_url'], snapshot['title']
return author_name, image_url, title
@@ -68,7 +68,7 @@ class LibratoWebhookHandler(LibratoWebhookParser):
SNAPSHOT: self.handle_snapshots
}
def find_handle_method(self) -> Callable[[], Text]:
def find_handle_method(self) -> Callable[[], str]:
for available_type in self.payload_available_types:
if self.payload.get(available_type):
return self.payload_available_types[available_type]
@@ -77,17 +77,17 @@ class LibratoWebhookHandler(LibratoWebhookParser):
return self.attachments_available_types[available_type]
raise Exception("Unexcepted message type")
def handle(self) -> Text:
def handle(self) -> str:
return self.find_handle_method()()
def generate_topic(self) -> Text:
def generate_topic(self) -> str:
if self.attachments:
return "Snapshots"
topic_template = "Alert {alert_name}"
alert_id, alert_name, alert_url, alert_runbook_url = self.parse_alert()
return topic_template.format(alert_name=alert_name)
def handle_alert_clear_message(self) -> Text:
def handle_alert_clear_message(self) -> str:
alert_clear_template = "Alert [alert_name]({alert_url}) has cleared at {trigger_time} UTC!"
trigger_time = datetime.fromtimestamp((self.payload['trigger_time']),
tz=timezone_utc).strftime('%Y-%m-%d %H:%M:%S')
@@ -97,19 +97,19 @@ class LibratoWebhookHandler(LibratoWebhookParser):
trigger_time=trigger_time)
return content
def handle_snapshots(self) -> Text:
def handle_snapshots(self) -> str:
content = u''
for attachment in self.attachments:
content += self.handle_snapshot(attachment)
return content
def handle_snapshot(self, snapshot: Dict[str, Any]) -> Text:
def handle_snapshot(self, snapshot: Dict[str, Any]) -> str:
snapshot_template = u"**{author_name}** sent a [snapshot]({image_url}) of [metric]({title})"
author_name, image_url, title = self.parse_snapshot(snapshot)
content = snapshot_template.format(author_name=author_name, image_url=image_url, title=title)
return content
def handle_alert_violation_message(self) -> Text:
def handle_alert_violation_message(self) -> str:
alert_violation_template = u"Alert [alert_name]({alert_url}) has triggered! "
alert_id, alert_name, alert_url, alert_runbook_url = self.parse_alert()
content = alert_violation_template.format(alert_name=alert_name, alert_url=alert_url)
@@ -119,7 +119,7 @@ class LibratoWebhookHandler(LibratoWebhookParser):
content += self.generate_conditions_and_violations()
return content
def generate_conditions_and_violations(self) -> Text:
def generate_conditions_and_violations(self) -> str:
conditions = self.parse_conditions()
violations = self.parse_violations()
content = u""
@@ -128,7 +128,7 @@ class LibratoWebhookHandler(LibratoWebhookParser):
return content
def generate_violated_metric_condition(self, violation: Dict[str, Any],
condition: Dict[str, Any]) -> Text:
condition: Dict[str, Any]) -> str:
summary_function, threshold, condition_type, duration = self.parse_condition(condition)
metric_name, recorded_at = self.parse_violation(violation)
metric_condition_template = (u"\n>Metric `{metric_name}`, {summary_function} "

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -21,5 +20,5 @@ class MentionHookTests(WebhookTestCase):
self.send_and_test_stream_message('webfeeds', expected_topic, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("mention", fixture_name, file_type="json")

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -22,5 +21,5 @@ Description sent via curl\n\nChangelog string'
self.send_and_test_stream_message('deployment', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("newrelic", fixture_name, file_type="txt")

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
from zerver.webhooks.opbeat.view import get_value

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Text, Dict, Any, List, Tuple, Union
from typing import Dict, Any, List, Tuple, Union
from django.http import HttpRequest, HttpResponse

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -152,5 +151,5 @@ class OpsGenieHookTests(WebhookTestCase):
self.send_and_test_stream_message('unacknowledge', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("opsgenie", fixture_name, file_type="json")

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase

View File

@@ -1,7 +1,7 @@
# Webhooks for external integrations.
import pprint
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
import ujson
from django.http import HttpRequest, HttpResponse
@@ -83,7 +83,7 @@ def send_raw_pagerduty_json(request: HttpRequest,
def send_formated_pagerduty(request: HttpRequest,
user_profile: UserProfile,
message_type: Text,
message_type: str,
format_dict: Dict[str, Any]) -> None:
if message_type in ('incident.trigger', 'incident.unacknowledge'):
template = (u':imp: Incident '

View File

@@ -1,4 +1,3 @@
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -40,5 +39,5 @@ May 18 20:30:02 abc cron OR server1:
self.send_and_test_stream_message('long_post', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("papertrail", fixture_name, file_type="json")

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,5 @@
# Webhooks pfor external integrations.
from typing import Any, Dict, Text
from typing import Any, Dict
import ujson
from django.http import HttpRequest, HttpResponse
@@ -47,11 +47,11 @@ def api_pingdom_webhook(request: HttpRequest, user_profile: UserProfile,
return json_success()
def get_subject_for_http_request(payload: Dict[str, Any]) -> Text:
def get_subject_for_http_request(payload: Dict[str, Any]) -> str:
return PINGDOM_SUBJECT_TEMPLATE.format(name=payload['check_name'])
def get_body_for_http_request(payload: Dict[str, Any]) -> Text:
def get_body_for_http_request(payload: Dict[str, Any]) -> str:
current_state = payload['current_state']
previous_state = payload['previous_state']
@@ -68,5 +68,5 @@ def get_body_for_http_request(payload: Dict[str, Any]) -> Text:
return body
def get_check_type(payload: Dict[str, Any]) -> Text:
def get_check_type(payload: Dict[str, Any]) -> str:
return payload['check_type']

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -69,7 +68,7 @@ class PivotalV3HookTests(WebhookTestCase):
[(view)](https://www.pivotaltracker.com/s/projects/807213/stories/48276573)'
self.send_and_test_stream_message('type_changed', expected_subject, expected_message, content_type="application/xml")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data('pivotal', fixture_name, file_type='xml')
class PivotalV5HookTests(WebhookTestCase):
@@ -145,5 +144,5 @@ Try again next time
* type changed from **feature** to **bug**"""
self.send_and_test_stream_message('type_changed', expected_subject, expected_message, content_type="application/xml")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data('pivotal', "v5_{}".format(fixture_name), file_type='json')

View File

@@ -2,7 +2,7 @@
import logging
import re
from typing import Any, Dict, List, Optional, Text, Tuple
from typing import Any, Dict, List, Optional, Tuple
import ujson
from defusedxml.ElementTree import fromstring as xml_fromstring
@@ -15,7 +15,7 @@ from zerver.lib.response import json_error, json_success
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.models import UserProfile
def api_pivotal_webhook_v3(request: HttpRequest, user_profile: UserProfile) -> Tuple[Text, Text]:
def api_pivotal_webhook_v3(request: HttpRequest, user_profile: UserProfile) -> Tuple[str, str]:
payload = xml_fromstring(request.body)
def get_text(attrs: List[str]) -> str:
@@ -79,7 +79,7 @@ UNSUPPORTED_EVENT_TYPES = [
"epic_update_activity",
]
def api_pivotal_webhook_v5(request: HttpRequest, user_profile: UserProfile) -> Tuple[Text, Text]:
def api_pivotal_webhook_v5(request: HttpRequest, user_profile: UserProfile) -> Tuple[str, str]:
payload = ujson.loads(request.body)
event_type = payload["kind"]
@@ -103,7 +103,7 @@ def api_pivotal_webhook_v5(request: HttpRequest, user_profile: UserProfile) -> T
content = ""
subject = "#%s: %s" % (story_id, story_name)
def extract_comment(change: Dict[str, Any]) -> Optional[Text]:
def extract_comment(change: Dict[str, Any]) -> Optional[str]:
if change.get("kind") == "comment":
return change.get("new_values", {}).get("text", None)
return None

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -163,5 +162,5 @@ class RaygunHookTests(WebhookTestCase):
content_type=
"application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("raygun", fixture_name, file_type="json")

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
@@ -40,7 +40,7 @@ def api_raygun_webhook(request: HttpRequest, user_profile: UserProfile,
return json_success()
def make_user_stats_chunk(error_dict: Dict[str, Any]) -> Text:
def make_user_stats_chunk(error_dict: Dict[str, Any]) -> str:
"""Creates a stat chunk about total occurrences and users affected for the
error.
@@ -59,7 +59,7 @@ def make_user_stats_chunk(error_dict: Dict[str, Any]) -> Text:
users_affected, total_occurrences)
def make_time_chunk(error_dict: Dict[str, Any]) -> Text:
def make_time_chunk(error_dict: Dict[str, Any]) -> str:
"""Creates a time message chunk.
Example: firstOccurredOn: "X", lastOccurredOn: "Y"
@@ -80,7 +80,7 @@ def make_time_chunk(error_dict: Dict[str, Any]) -> Text:
time_last)
def make_message_chunk(message: Text) -> Text:
def make_message_chunk(message: str) -> str:
"""Creates a message chunk if exists.
Example: message: "This is an example message" returns "Message: This is an
@@ -94,7 +94,7 @@ def make_message_chunk(message: Text) -> Text:
return "Message: {}\n".format(message) if message != "" else ""
def make_app_info_chunk(app_dict: Dict[str, str]) -> Text:
def make_app_info_chunk(app_dict: Dict[str, str]) -> str:
"""Creates a message chunk that contains the application info and the link
to the Raygun dashboard about the application.
@@ -106,7 +106,7 @@ def make_app_info_chunk(app_dict: Dict[str, str]) -> Text:
return "Application details: [{}]({})\n".format(app_name, app_url)
def notification_message_follow_up(payload: Dict[str, Any]) -> Text:
def notification_message_follow_up(payload: Dict[str, Any]) -> str:
"""Creates a message for a repeating error follow up
:param payload: Raygun payload
@@ -140,7 +140,7 @@ def notification_message_follow_up(payload: Dict[str, Any]) -> Text:
return message
def notification_message_error_occurred(payload: Dict[str, Any]) -> Text:
def notification_message_error_occurred(payload: Dict[str, Any]) -> str:
"""Creates a message for a new error or reoccurred error
:param payload: Raygun payload
@@ -199,7 +199,7 @@ def notification_message_error_occurred(payload: Dict[str, Any]) -> Text:
return message
def compose_notification_message(payload: Dict[str, Any]) -> Text:
def compose_notification_message(payload: Dict[str, Any]) -> str:
"""Composes a message that contains information on the error
:param payload: Raygun payload
@@ -226,7 +226,7 @@ def compose_notification_message(payload: Dict[str, Any]) -> Text:
return "Unsupported event_type type: {}".format(event_type)
def activity_message(payload: Dict[str, Any]) -> Text:
def activity_message(payload: Dict[str, Any]) -> str:
"""Creates a message from an activity that is being taken for an error
:param payload: Raygun payload
@@ -263,7 +263,7 @@ def activity_message(payload: Dict[str, Any]) -> Text:
return message
def compose_activity_message(payload: Dict[str, Any]) -> Text:
def compose_activity_message(payload: Dict[str, Any]) -> str:
"""Composes a message that contains an activity that is being taken to
an error, such as commenting, assigning an error to a user, ignoring the
error, etc.
@@ -288,7 +288,7 @@ def compose_activity_message(payload: Dict[str, Any]) -> Text:
return "Unsupported event_type type: {}".format(event_type)
def parse_time(timestamp: Text) -> Text:
def parse_time(timestamp: str) -> str:
"""Parses and returns the timestamp provided
:param timestamp: The timestamp provided by the payload

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -25,5 +24,5 @@ class SemaphoreHookTests(WebhookTestCase):
self.send_and_test_stream_message('deploy', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("semaphore", fixture_name, file_type="json")

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
import urllib
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -57,5 +56,5 @@ class SolanoHookTests(WebhookTestCase):
self.send_and_test_stream_message('test', expected_topic, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data(self.FIXTURE_DIR_NAME, fixture_name, file_type="json")

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
@@ -104,5 +103,5 @@ class SplunkHookTests(WebhookTestCase):
expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("splunk", fixture_name, file_type="json")

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations.
from typing import Any, Dict, Iterable, Optional, Text
from typing import Any, Dict, Iterable, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class StatuspageHookTests(WebhookTestCase):
@@ -34,5 +33,5 @@ from **operational** to **under_maintenance**"
expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("statuspage", fixture_name, file_type="json")

View File

@@ -5,33 +5,33 @@ from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.decorator import REQ, has_request_variables, api_key_only_webhook_view
from zerver.models import get_client, UserProfile
from django.http import HttpRequest, HttpResponse
from typing import Dict, Any, Text
from typing import Dict, Any
INCIDENT_TEMPLATE = u'**{name}** \n * State: **{state}** \n * Description: {content}'
COMPONENT_TEMPLATE = u'**{name}** has changed status from **{old_status}** to **{new_status}**'
TOPIC_TEMPLATE = u'{name}: {description}'
def get_incident_events_body(payload: Dict[Text, Any]) -> Text:
def get_incident_events_body(payload: Dict[str, Any]) -> str:
return INCIDENT_TEMPLATE.format(
name = payload["incident"]["name"],
state = payload["incident"]["status"],
content = payload["incident"]["incident_updates"][0]["body"],
)
def get_components_update_body(payload: Dict[Text, Any]) -> Text:
def get_components_update_body(payload: Dict[str, Any]) -> str:
return COMPONENT_TEMPLATE.format(
name = payload["component"]["name"],
old_status = payload["component_update"]["old_status"],
new_status = payload["component_update"]["new_status"],
)
def get_incident_topic(payload: Dict[Text, Any]) -> Text:
def get_incident_topic(payload: Dict[str, Any]) -> str:
return TOPIC_TEMPLATE.format(
name = payload["incident"]["name"],
description = payload["page"]["status_description"],
)
def get_component_topic(payload: Dict[Text, Any]) -> Text:
def get_component_topic(payload: Dict[str, Any]) -> str:
return TOPIC_TEMPLATE.format(
name = payload["component"]["name"],
description = payload["page"]["status_description"],

View File

@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
import mock
@@ -141,5 +140,5 @@ class StripeHookTests(WebhookTestCase):
self.send_and_test_stream_message('transfer_paid', expected_subject, expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("stripe", fixture_name, file_type="json")

View File

@@ -1,7 +1,7 @@
# Webhooks for external integrations.
import time
from datetime import datetime
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
@@ -16,7 +16,7 @@ from zerver.models import UserProfile
@has_request_variables
def api_stripe_webhook(request: HttpRequest, user_profile: UserProfile,
payload: Dict[str, Any]=REQ(argument_type='body'),
stream: Text=REQ(default='test')) -> HttpResponse:
stream: str=REQ(default='test')) -> HttpResponse:
body = None
event_type = payload["type"]
data_object = payload["data"]["object"]

View File

@@ -1,6 +1,4 @@
# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class TaigaHookTests(WebhookTestCase):

View File

@@ -18,7 +18,7 @@ value should always be in bold; otherwise the subject of US/task
should be in bold.
"""
from typing import Any, Dict, List, Mapping, Optional, Text, Tuple
from typing import Any, Dict, List, Mapping, Optional, Tuple
import ujson
from django.http import HttpRequest, HttpResponse

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Any, Dict, Text
from typing import Any, Dict
from zerver.lib.test_classes import WebhookTestCase
@@ -39,5 +39,5 @@ class TransifexHookTests(WebhookTestCase):
)
self.send_and_test_stream_message("", expected_subject, expected_message)
def get_body(self, fixture_name: Text) -> Dict[str, Any]:
def get_body(self, fixture_name: str) -> Dict[str, Any]:
return {}

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
import urllib
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
from zerver.models import get_realm, get_user
@@ -55,5 +54,5 @@ class TravisHookTests(WebhookTestCase):
content_type="application/x-www-form-urlencoded"
)
def get_body(self, fixture_name: Text) -> Text:
def get_body(self, fixture_name: str) -> str:
return urllib.parse.urlencode({'payload': self.webhook_fixture_data("travis", fixture_name, file_type="json")})

View File

@@ -1,6 +1,6 @@
# Webhooks for external integrations.
import ujson
from typing import Mapping, Any, Tuple, Text, Optional
from typing import Mapping, Any, Tuple, Optional
from django.utils.translation import ugettext as _
from django.http import HttpRequest, HttpResponse
from zerver.decorator import api_key_only_webhook_view, return_success_on_head_request
@@ -33,7 +33,7 @@ def api_trello_webhook(request: HttpRequest,
check_send_webhook_message(request, user_profile, subject, body)
return json_success()
def get_subject_and_body(payload: Mapping[str, Any], action_type: Text) -> Optional[Tuple[Text, Text]]:
def get_subject_and_body(payload: Mapping[str, Any], action_type: str) -> Optional[Tuple[str, str]]:
if action_type in SUPPORTED_CARD_ACTIONS:
return process_card_action(payload, action_type)
if action_type in SUPPORTED_BOARD_ACTIONS:

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Mapping, MutableMapping, Optional, Text, Tuple
from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple
from .exceptions import UnknownUpdateBoardAction
@@ -24,13 +24,13 @@ ACTIONS_TO_MESSAGE_MAPPER = {
}
def process_board_action(payload: Mapping[str, Any],
action_type: Optional[Text]) -> Optional[Tuple[Text, Text]]:
action_type: Optional[str]) -> Optional[Tuple[str, str]]:
action_type = get_proper_action(payload, action_type)
if action_type is not None:
return get_subject(payload), get_body(payload, action_type)
return None
def get_proper_action(payload: Mapping[str, Any], action_type: Optional[Text]) -> Optional[Text]:
def get_proper_action(payload: Mapping[str, Any], action_type: Optional[str]) -> Optional[str]:
if action_type == 'updateBoard':
data = get_action_data(payload)
# we don't support events for when a board's background
@@ -42,27 +42,27 @@ def get_proper_action(payload: Mapping[str, Any], action_type: Optional[Text]) -
raise UnknownUpdateBoardAction()
return action_type
def get_subject(payload: Mapping[str, Any]) -> Text:
def get_subject(payload: Mapping[str, Any]) -> str:
return get_action_data(payload)['board']['name']
def get_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_body(payload: Mapping[str, Any], action_type: str) -> str:
message_body = ACTIONS_TO_FILL_BODY_MAPPER[action_type](payload, action_type)
creator = payload['action']['memberCreator']['fullName']
return u'{full_name} {rest}'.format(full_name=creator, rest=message_body)
def get_managed_member_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_managed_member_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'member_name': payload['action']['member']['fullName'],
}
return fill_appropriate_message_content(payload, action_type, data)
def get_create_list_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_create_list_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'list_name': get_action_data(payload)['list']['name'],
}
return fill_appropriate_message_content(payload, action_type, data)
def get_change_name_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_change_name_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'old_name': get_action_data(payload)['old']['name']
}
@@ -70,24 +70,24 @@ def get_change_name_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def fill_appropriate_message_content(payload: Mapping[str, Any],
action_type: Text,
data: Optional[Dict[str, Any]]=None) -> Text:
action_type: str,
data: Optional[Dict[str, Any]]=None) -> str:
data = {} if data is None else data
data['board_url_template'] = data.get('board_url_template', get_filled_board_url_template(payload))
message_body = get_message_body(action_type)
return message_body.format(**data)
def get_filled_board_url_template(payload: Mapping[str, Any]) -> Text:
def get_filled_board_url_template(payload: Mapping[str, Any]) -> str:
return TRELLO_BOARD_URL_TEMPLATE.format(board_name=get_board_name(payload),
board_url=get_board_url(payload))
def get_board_name(payload: Mapping[str, Any]) -> Text:
def get_board_name(payload: Mapping[str, Any]) -> str:
return get_action_data(payload)['board']['name']
def get_board_url(payload: Mapping[str, Any]) -> Text:
def get_board_url(payload: Mapping[str, Any]) -> str:
return u'https://trello.com/b/{}'.format(get_action_data(payload)['board']['shortLink'])
def get_message_body(action_type: Text) -> Text:
def get_message_body(action_type: str) -> str:
return ACTIONS_TO_MESSAGE_MAPPER[action_type]
def get_action_data(payload: Mapping[str, Any]) -> Mapping[str, Any]:

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Dict, Mapping, MutableMapping, Optional, Text, Tuple
from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple
from .exceptions import UnknownUpdateCardAction
@@ -61,13 +61,13 @@ ACTIONS_TO_MESSAGE_MAPPER = {
def prettify_date(date_string: str) -> str:
return date_string.replace('T', ' ').replace('.000', '').replace('Z', ' UTC')
def process_card_action(payload: Mapping[str, Any], action_type: Text) -> Optional[Tuple[Text, Text]]:
def process_card_action(payload: Mapping[str, Any], action_type: str) -> Optional[Tuple[str, str]]:
proper_action = get_proper_action(payload, action_type)
if proper_action is not None:
return get_subject(payload), get_body(payload, proper_action)
return None
def get_proper_action(payload: Mapping[str, Any], action_type: Text) -> Optional[Text]:
def get_proper_action(payload: Mapping[str, Any], action_type: str) -> Optional[str]:
if action_type == 'updateCard':
data = get_action_data(payload)
old_data = data['old']
@@ -102,28 +102,28 @@ def get_proper_action(payload: Mapping[str, Any], action_type: Text) -> Optional
return action_type
def get_subject(payload: Mapping[str, Any]) -> Text:
def get_subject(payload: Mapping[str, Any]) -> str:
return get_action_data(payload)['board'].get('name')
def get_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_body(payload: Mapping[str, Any], action_type: str) -> str:
message_body = ACTIONS_TO_FILL_BODY_MAPPER[action_type](payload, action_type)
creator = payload['action']['memberCreator'].get('fullName')
return u'{full_name} {rest}'.format(full_name=creator, rest=message_body)
def get_added_checklist_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_added_checklist_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'checklist_name': get_action_data(payload)['checklist'].get('name'),
}
return fill_appropriate_message_content(payload, action_type, data)
def get_added_attachment_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_added_attachment_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'attachment_url': get_action_data(payload)['attachment'].get('url'),
'attachment_name': get_action_data(payload)['attachment'].get('name'),
}
return fill_appropriate_message_content(payload, action_type, data)
def get_updated_card_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_updated_card_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'card_name': get_card_name(payload),
'old_list': get_action_data(payload)['listBefore'].get('name'),
@@ -131,7 +131,7 @@ def get_updated_card_body(payload: Mapping[str, Any], action_type: Text) -> Text
}
return fill_appropriate_message_content(payload, action_type, data)
def get_renamed_card_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_renamed_card_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'old_name': get_action_data(payload)['old'].get('name'),
@@ -139,72 +139,72 @@ def get_renamed_card_body(payload: Mapping[str, Any], action_type: Text) -> Text
}
return fill_appropriate_message_content(payload, action_type, data)
def get_added_label_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_added_label_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'color': get_action_data(payload).get('value'),
'text': get_action_data(payload).get('text'),
}
return fill_appropriate_message_content(payload, action_type, data)
def get_managed_member_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_managed_member_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'member_name': payload['action']['member'].get('fullName')
}
return fill_appropriate_message_content(payload, action_type, data)
def get_comment_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_comment_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'text': get_action_data(payload)['text'],
}
return fill_appropriate_message_content(payload, action_type, data)
def get_managed_due_date_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_managed_due_date_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'due_date': prettify_date(get_action_data(payload)['card'].get('due'))
}
return fill_appropriate_message_content(payload, action_type, data)
def get_changed_due_date_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_changed_due_date_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'due_date': prettify_date(get_action_data(payload)['card'].get('due')),
'old_due_date': prettify_date(get_action_data(payload)['old'].get('due'))
}
return fill_appropriate_message_content(payload, action_type, data)
def get_managed_desc_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_managed_desc_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'desc': prettify_date(get_action_data(payload)['card']['desc'])
}
return fill_appropriate_message_content(payload, action_type, data)
def get_changed_desc_body(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_changed_desc_body(payload: Mapping[str, Any], action_type: str) -> str:
data = {
'desc': prettify_date(get_action_data(payload)['card']['desc']),
'old_desc': prettify_date(get_action_data(payload)['old']['desc'])
}
return fill_appropriate_message_content(payload, action_type, data)
def get_body_by_action_type_without_data(payload: Mapping[str, Any], action_type: Text) -> Text:
def get_body_by_action_type_without_data(payload: Mapping[str, Any], action_type: str) -> str:
return fill_appropriate_message_content(payload, action_type)
def fill_appropriate_message_content(payload: Mapping[str, Any],
action_type: Text,
data: Optional[Dict[str, Any]]=None) -> Text:
action_type: str,
data: Optional[Dict[str, Any]]=None) -> str:
data = {} if data is None else data
data['card_url_template'] = data.get('card_url_template', get_filled_card_url_template(payload))
message_body = get_message_body(action_type)
return message_body.format(**data)
def get_filled_card_url_template(payload: Mapping[str, Any]) -> Text:
def get_filled_card_url_template(payload: Mapping[str, Any]) -> str:
return TRELLO_CARD_URL_TEMPLATE.format(card_name=get_card_name(payload), card_url=get_card_url(payload))
def get_card_url(payload: Mapping[str, Any]) -> Text:
def get_card_url(payload: Mapping[str, Any]) -> str:
return u'https://trello.com/c/{}'.format(get_action_data(payload)['card'].get('shortLink'))
def get_message_body(action_type: Text) -> Text:
def get_message_body(action_type: str) -> str:
return ACTIONS_TO_MESSAGE_MAPPER[action_type]
def get_card_name(payload: Mapping[str, Any]) -> Text:
def get_card_name(payload: Mapping[str, Any]) -> str:
return get_action_data(payload)['card'].get('name')
def get_action_data(payload: Mapping[str, Any]) -> Mapping[str, Any]:

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Any, Dict, Text
from typing import Any, Dict
from zerver.lib.test_classes import WebhookTestCase
@@ -21,5 +21,5 @@ class YoHookTests(WebhookTestCase):
self.send_and_test_private_message('', expected_message=expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: Text) -> Dict[str, Any]:
def get_body(self, fixture_name: str) -> Dict[str, Any]:
return {}

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from zerver.lib.test_classes import WebhookTestCase
@@ -16,7 +16,7 @@ class ZenDeskHookTests(WebhookTestCase):
DEFAULT_MESSAGE = 'Message'
MESSAGE = DEFAULT_MESSAGE
def get_body(self, fixture_name: Text) -> Dict[str, Any]:
def get_body(self, fixture_name: str) -> Dict[str, Any]:
return {
'ticket_title': self.TICKET_TITLE,
'ticket_id': self.TICKET_ID,
@@ -24,7 +24,7 @@ class ZenDeskHookTests(WebhookTestCase):
'stream': self.STREAM_NAME,
}
def do_test(self, expected_subject: Optional[Text]=None, expected_message: Optional[Text]=None) -> None:
def do_test(self, expected_subject: Optional[str]=None, expected_message: Optional[str]=None) -> None:
self.api_stream_message(self.TEST_USER_EMAIL, "", expected_subject, expected_message,
content_type=None)
self.TICKET_TITLE = self.DEFAULT_TICKET_TITLE

View File

@@ -1,5 +1,4 @@
# Webhooks for external integrations.
from typing import Text
from django.http import HttpRequest, HttpResponse
@@ -9,7 +8,7 @@ from zerver.lib.response import json_success
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.models import UserProfile, get_client
def truncate(string: Text, length: int) -> Text:
def truncate(string: str, length: int) -> str:
if len(string) > length:
string = string[:length-3] + '...'
return string