slack: Fetch workspace users from /users.list in the correct manner.

1. Fetching from the `/users.list` endpoint is supposed to use
   pagination. Slack will return at most 1000 results in a single
   request. This means that our Slack import system hasn't worked
   properly for workspaces with more than 1000 users. Users after the
   first 1000 would be considered by our tool as mirror dummies and thus
   created with is_active=False,is_mirror_dummy=True.
   Ref https://api.slack.com/methods/users.list

2. Workspaces with a lot of users, and therefore requiring the use of
   paginated requests to fetch them all, might also get us to run into
   Slack's rate limits, since we'll be doing repeating requests to the
   endpoint.
   Therefore, the API fetch needs to also handle rate limiting errors
   correctly.
   Per, https://api.slack.com/apis/rate-limits#headers, we can just read
   the retry-after header from the rsponse and wait the indicated number
   of seconds before repeating the requests. This is an easy approach to
   implement, so that's what we go with here.
This commit is contained in:
Mateusz Mandera
2025-01-25 01:52:26 +08:00
committed by Tim Abbott
parent 3dd3de3efa
commit f81e514d07
3 changed files with 148 additions and 11 deletions

View File

@@ -6,6 +6,7 @@ import random
import re
import secrets
import shutil
import time
import zipfile
from collections import defaultdict
from collections.abc import Iterator
@@ -1501,8 +1502,14 @@ def do_convert_directory(
raise ValueError("Import does not have the layout we expect from a Slack export!")
# We get the user data from the legacy token method of Slack API, which is depreciated
# but we use it as the user email data is provided only in this method
user_list = get_slack_api_data("https://slack.com/api/users.list", "members", token=token)
# but we use it as the user email data is provided only in this method.
# Fetching from this endpoint requires using pagination, as only a subset
# of the users might be returned in any single request.
# We use the limit value of 200, as that's suggested in Slack's documentation for this
# endpoint.
user_list = get_slack_api_data(
"https://slack.com/api/users.list", "members", token=token, pagination_limit=200
)
fetch_shared_channel_users(user_list, slack_data_dir, token)
custom_emoji_list = get_slack_api_data("https://slack.com/api/emoji.list", "emoji", token=token)
@@ -1611,16 +1618,67 @@ def check_token_access(token: str, required_scopes: set[str]) -> None:
raise Exception("Unknown token type -- must start with xoxb- or xoxp-")
def get_slack_api_data(slack_api_url: str, get_param: str, **kwargs: Any) -> Any:
def get_slack_api_data(
slack_api_url: str,
get_param: str,
*,
pagination_limit: int | None = None,
raise_if_rate_limited: bool = False,
**kwargs: Any,
) -> Any:
if not kwargs.get("token"):
raise AssertionError("Slack token missing in kwargs")
token = kwargs.pop("token")
data = requests.get(slack_api_url, headers={"Authorization": f"Bearer {token}"}, params=kwargs)
if data.status_code == requests.codes.ok:
result = data.json()
token = kwargs.pop("token")
accumulated_result = []
cursor: str | None = None
while True:
if pagination_limit is not None:
# If we're fetching with pagination, this might take a while, so we want reasonable logging to show
# progress and what's being fetched.
logging.info(
"Fetching page from %s with cursor: %s and limit: %s",
slack_api_url,
cursor,
pagination_limit,
)
params: dict[str, int | str] = {"limit": pagination_limit} if pagination_limit else {}
if cursor:
params["cursor"] = cursor
params.update(kwargs)
response = requests.get(
slack_api_url, headers={"Authorization": f"Bearer {token}"}, params=params
)
if response.status_code == 429:
if raise_if_rate_limited:
raise Exception("Exceeded Slack rate limits.")
retry_after = int(response.headers.get("retry-after", 1))
logging.info("Rate limit exceeded. Retrying in %s seconds...", retry_after)
time.sleep(retry_after)
continue
if response.status_code != requests.codes.ok:
logging.info("HTTP error: %s, Response: %s", response.status_code, response.text)
raise Exception("HTTP error accessing the Slack API.")
result = response.json()
if not result["ok"]:
raise Exception("Error accessing Slack API: {}".format(result["error"]))
return result[get_param]
raise Exception("HTTP error accessing the Slack API.")
result_data = result[get_param]
if pagination_limit is None:
# We're not using pagination, so we don't want to loop and should just return the result.
return result_data
accumulated_result.extend(result_data)
if not result.get("response_metadata", {}).get("next_cursor"):
# Everything has been fetched.
break
cursor = result["response_metadata"]["next_cursor"]
return accumulated_result

View File

@@ -147,6 +147,77 @@ def request_callback(request: PreparedRequest) -> tuple[int, dict[str, str], byt
class SlackImporter(ZulipTestCase):
@responses.activate
def test_get_slack_api_data_with_pagination(self) -> None:
token = "xoxb-valid-token"
pagination_limit = 40
api_users_list = [f"user{i}" for i in range(100)]
count = 0
def paginated_request_callback(
request: PreparedRequest,
) -> tuple[int, dict[str, str], bytes]:
"""
A callback that in a very simple way simulates Slack's /users.list API
with support for Pagination and some rate-limiting behavior.
"""
assert request.url is not None
assert request.url.startswith("https://slack.com/api/users.list")
# Otherwise mypy complains about PreparedRequest not having params attribute:
assert hasattr(request, "params")
nonlocal count
count += 1
self.assertEqual(request.params["limit"], str(pagination_limit))
cursor = int(request.params.get("cursor", 0))
next_cursor = cursor + pagination_limit
if count % 3 == 0:
# Simulate a rate limit hit on every third request.
return (
429,
{"retry-after": "30"},
orjson.dumps({"ok": False, "error": "rate_limit_hit"}),
)
result_user_data = api_users_list[cursor:next_cursor]
if next_cursor >= len(api_users_list):
# The fetch is completed.
response_metadata = {}
else:
response_metadata = {"next_cursor": str(next_cursor)}
return (
200,
{},
orjson.dumps(
{
"ok": True,
"members": result_user_data,
"response_metadata": response_metadata,
}
),
)
responses.add_callback(
responses.GET, "https://slack.com/api/users.list", callback=paginated_request_callback
)
with (
mock.patch("zerver.data_import.slack.time.sleep", return_value=None) as mock_sleep,
self.assertLogs(level="INFO") as mock_log,
):
result = get_slack_api_data(
"https://slack.com/api/users.list",
"members",
token=token,
pagination_limit=pagination_limit,
)
self.assertEqual(result, api_users_list)
self.assertEqual(mock_sleep.call_count, 1)
self.assertIn("INFO:root:Rate limit exceeded. Retrying in 30 seconds...", mock_log.output)
@responses.activate
def test_get_slack_api_data(self) -> None:
token = "xoxb-valid-token"
@@ -210,9 +281,10 @@ class SlackImporter(ZulipTestCase):
token = "xoxb-status404"
wrong_url = "https://slack.com/api/wrong"
responses.add_callback(responses.GET, wrong_url, callback=request_callback)
with self.assertRaises(Exception) as invalid:
with self.assertRaises(Exception) as invalid, self.assertLogs(level="INFO") as mock_log:
get_slack_api_data(wrong_url, "members", token=token)
self.assertEqual(invalid.exception.args, ("HTTP error accessing the Slack API.",))
self.assertEqual(mock_log.output, ["INFO:root:HTTP error: 404, Response: "])
def test_build_zerver_realm(self) -> None:
realm_id = 2

View File

@@ -48,6 +48,8 @@ def get_slack_channel_name(channel_id: str, token: str) -> str:
slack_channel_data = get_slack_api_data(
"https://slack.com/api/conversations.info",
get_param="channel",
# Sleeping is not permitted from webhook code.
raise_if_rate_limited=True,
token=token,
channel=channel_id,
)
@@ -56,7 +58,12 @@ def get_slack_channel_name(channel_id: str, token: str) -> str:
def get_slack_sender_name(user_id: str, token: str) -> str:
slack_user_data = get_slack_api_data(
"https://slack.com/api/users.info", get_param="user", token=token, user=user_id
"https://slack.com/api/users.info",
get_param="user",
# Sleeping is not permitted from webhook code.
raise_if_rate_limited=True,
token=token,
user=user_id,
)
return slack_user_data["name"]