Files
zulip/zerver/webhooks/splunk/tests.py
Feorlen e0acdfde0d Add a Splunk webhook integration.
Add a webhook to create messages from Splunk search alerts. The search
alert JSON includes the first search result and a link to view the full
results. The following fields are used:

* search_name - the name of the saved search
* results_link - URL of the full search results
* host - the host the search result came from
* source - the source file on that host
* _raw - the raw text of the logged event.

The Zulip message contains:
* search name
* host
* source
* raw

The destination stream and message topic are configurable: the default
stream is "splunk" and the default topic "Splunk Alert". If the topic is
not provided in the URL, the search name is used instead (truncated if too
long. If a needed field is missing, a default value is used instead.

Example: "Missing source"

It is possible to configure a Splunk search to not include some values,
so I've provided defaults rather than return an error for missing data.
In practice, these fields are unlikely to be deliberately suppressed.

Note: alerts are only available for Splunk servers using a valid trial,
developer, or paid license.

I've added tests for the normal case of one search result, the topic from
the search name, and for a search missing one of the fields used. Tested
using Splunk Enterprise 6.5.1.

Fixes #3477
2017-03-07 21:48:26 -08:00

140 lines
8.0 KiB
Python

# -*- coding: utf-8 -*-
from typing import Text
from zerver.lib.test_classes import WebhookTestCase
class SplunkHookTests(WebhookTestCase):
STREAM_NAME = 'splunk'
TOPIC = u"Default Topic"
URL_TEMPLATE = "/api/v1/external/splunk?api_key={api_key}&stream={stream}&topic={topic}"
FIXTURE_DIR_NAME = 'splunk'
# override the base class behavior so we can include TOPIC
def build_webhook_url(self):
# type: () -> Text
api_key = self.get_api_key(self.TEST_USER_EMAIL)
return self.URL_TEMPLATE.format(stream=self.STREAM_NAME,
api_key=api_key,
topic=self.TOPIC)
def test_splunk_search_one_result(self):
# type: () -> None
# construct the URL used for this test
self.TOPIC = u"New Search Alert"
self.url = self.build_webhook_url()
# define the expected message contents
expected_subject = u"New Search Alert"
expected_message = u"Splunk alert from saved search\n[sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: myserver\nsource: /var/log/auth.log\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
# using fixture named splunk_search_one_result, execute this test
self.send_and_test_stream_message('search_one_result',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_short_search_name(self):
# type: () -> None
# don't provide a topic so the search name is used instead
self.URL_TEMPLATE = "/api/v1/external/splunk?api_key={api_key}&stream={stream}"
self.url = self.build_webhook_url()
expected_subject = u"This search's name isn't that long"
expected_message = u"Splunk alert from saved search\n[This search's name isn't that long](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: myserver\nsource: /var/log/auth.log\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
self.send_and_test_stream_message('short_search_name',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_long_search_name(self):
# type: () -> None
# don't provide a topic so the search name is used instead
self.URL_TEMPLATE = "/api/v1/external/splunk?api_key={api_key}&stream={stream}"
self.url = self.build_webhook_url()
expected_subject = u"this-search's-got-47-words-37-sentences-58-words-we-wanna..."
expected_message = u"Splunk alert from saved search\n[this-search's-got-47-words-37-sentences-58-words-we-wanna-know-details-of-the-search-time-of-the-search-and-any-other-kind-of-thing-you-gotta-say-pertaining-to-and-about-the-search-I-want-to-know-authenticated-user's-name-and-any-other-kind-of-thing-you-gotta-say](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: myserver\nsource: /var/log/auth.log\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
self.send_and_test_stream_message('long_search_name',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_results_link(self):
# type: () -> None
self.TOPIC = u"New Search Alert"
self.url = self.build_webhook_url()
expected_subject = u"New Search Alert"
expected_message = u"Splunk alert from saved search\n[sudo](Missing results_link)\nhost: myserver\nsource: /var/log/auth.log\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
self.send_and_test_stream_message('missing_results_link',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_search_name(self):
# type: () -> None
self.TOPIC = u"New Search Alert"
self.url = self.build_webhook_url()
expected_subject = u"New Search Alert"
expected_message = u"Splunk alert from saved search\n[Missing search_name](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: myserver\nsource: /var/log/auth.log\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
self.send_and_test_stream_message('missing_search_name',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_host(self):
# type: () -> None
self.TOPIC = u"New Search Alert"
self.url = self.build_webhook_url()
expected_subject = u"New Search Alert"
expected_message = u"Splunk alert from saved search\n[sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: Missing host\nsource: /var/log/auth.log\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
self.send_and_test_stream_message('missing_host',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_source(self):
# type: () -> None
self.TOPIC = u"New Search Alert"
self.url = self.build_webhook_url()
expected_subject = u"New Search Alert"
expected_message = u"Splunk alert from saved search\n[sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: myserver\nsource: Missing source\n\nraw: Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root"
self.send_and_test_stream_message('missing_source',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_raw(self):
# type: () -> None
self.TOPIC = u"New Search Alert"
self.url = self.build_webhook_url()
expected_subject = u"New Search Alert"
expected_message = u"Splunk alert from saved search\n[sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)\nhost: myserver\nsource: /var/log/auth.log\n\nraw: Missing _raw"
self.send_and_test_stream_message('missing_raw',
expected_subject,
expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name):
# type: (Text) -> Text
return self.fixture_data("splunk", fixture_name, file_type="json")