mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-26 09:34:02 +00:00 
			
		
		
		
	This is a preview rule, not yet enabled by default. Signed-off-by: Anders Kaseorg <anders@zulip.com>
		
			
				
	
	
		
			227 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			227 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import itertools
 | |
| import os
 | |
| import random
 | |
| from typing import Any
 | |
| 
 | |
| import orjson
 | |
| 
 | |
| from scripts.lib.zulip_tools import get_or_create_dev_uuid_var_path
 | |
| from zerver.lib.topic import RESOLVED_TOPIC_PREFIX
 | |
| 
 | |
| 
 | |
| def load_config() -> dict[str, Any]:
 | |
|     with open("zerver/tests/fixtures/config.generate_data.json", "rb") as infile:
 | |
|         config = orjson.loads(infile.read())
 | |
| 
 | |
|     return config
 | |
| 
 | |
| 
 | |
| def generate_topics(num_topics: int) -> list[str]:
 | |
|     config = load_config()["gen_fodder"]
 | |
| 
 | |
|     # Make single word topics account for 30% of total topics.
 | |
|     # Single word topics are most common, thus
 | |
|     # it is important we test on it.
 | |
|     num_single_word_topics = num_topics // 3
 | |
|     topic_names = random.choices(config["nouns"], k=num_single_word_topics)
 | |
| 
 | |
|     sentence = ["adjectives", "nouns", "connectors", "verbs", "adverbs"]
 | |
|     for pos in sentence:
 | |
|         # Add an empty string so that we can generate variable length topics.
 | |
|         config[pos].append("")
 | |
| 
 | |
|     topic_names.extend(
 | |
|         " ".join(word for pos in sentence if (word := random.choice(config[pos])) != "")
 | |
|         for _ in range(num_topics - num_single_word_topics)
 | |
|     )
 | |
| 
 | |
|     # Mark a small subset of topics as resolved in some streams, and
 | |
|     # many topics in a few streams. Note that these don't have the
 | |
|     # "Marked as resolved" messages, so don't match the normal user
 | |
|     # experience perfectly.
 | |
|     if random.random() < 0.15:
 | |
|         resolved_topic_probability = 0.5
 | |
|     else:
 | |
|         resolved_topic_probability = 0.05
 | |
| 
 | |
|     return [
 | |
|         (
 | |
|             RESOLVED_TOPIC_PREFIX + topic_name
 | |
|             if random.random() < resolved_topic_probability
 | |
|             else topic_name
 | |
|         )
 | |
|         for topic_name in topic_names
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def load_generators(config: dict[str, Any]) -> dict[str, Any]:
 | |
|     results = {}
 | |
|     cfg = config["gen_fodder"]
 | |
| 
 | |
|     results["nouns"] = itertools.cycle(cfg["nouns"])
 | |
|     results["adjectives"] = itertools.cycle(cfg["adjectives"])
 | |
|     results["connectors"] = itertools.cycle(cfg["connectors"])
 | |
|     results["verbs"] = itertools.cycle(cfg["verbs"])
 | |
|     results["adverbs"] = itertools.cycle(cfg["adverbs"])
 | |
|     results["emojis"] = itertools.cycle(cfg["emoji"])
 | |
|     results["links"] = itertools.cycle(cfg["links"])
 | |
| 
 | |
|     results["maths"] = itertools.cycle(cfg["maths"])
 | |
|     results["inline-code"] = itertools.cycle(cfg["inline-code"])
 | |
|     results["code-blocks"] = itertools.cycle(cfg["code-blocks"])
 | |
|     results["quote-blocks"] = itertools.cycle(cfg["quote-blocks"])
 | |
|     results["images"] = itertools.cycle(cfg["images"])
 | |
| 
 | |
|     results["lists"] = itertools.cycle(cfg["lists"])
 | |
| 
 | |
|     return results
 | |
| 
 | |
| 
 | |
| def parse_file(config: dict[str, Any], gens: dict[str, Any], corpus_file: str) -> list[str]:
 | |
|     # First, load the entire file into a dictionary,
 | |
|     # then apply our custom filters to it as needed.
 | |
| 
 | |
|     paragraphs: list[str] = []
 | |
| 
 | |
|     with open(corpus_file) as infile:
 | |
|         # OUR DATA: we need to separate the person talking and what they say
 | |
|         paragraphs = remove_line_breaks(infile)
 | |
|         paragraphs = add_flair(paragraphs, gens)
 | |
| 
 | |
|     return paragraphs
 | |
| 
 | |
| 
 | |
| def get_flair_gen(length: int) -> list[str]:
 | |
|     # Grab the percentages from the config file
 | |
|     # create a list that we can consume that will guarantee the distribution
 | |
|     result = []
 | |
| 
 | |
|     for k, v in config["dist_percentages"].items():
 | |
|         result.extend([k] * int(v * length / 100))
 | |
| 
 | |
|     result.extend(["None"] * (length - len(result)))
 | |
| 
 | |
|     random.shuffle(result)
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def add_flair(paragraphs: list[str], gens: dict[str, Any]) -> list[str]:
 | |
|     # roll the dice and see what kind of flair we should add, if any
 | |
|     results = []
 | |
| 
 | |
|     flair = get_flair_gen(len(paragraphs))
 | |
| 
 | |
|     for i in range(len(paragraphs)):
 | |
|         key = flair[i]
 | |
|         if key == "None":
 | |
|             txt = paragraphs[i]
 | |
|         elif key == "italic":
 | |
|             txt = add_md("*", paragraphs[i])
 | |
|         elif key == "bold":
 | |
|             txt = add_md("**", paragraphs[i])
 | |
|         elif key == "strike-thru":
 | |
|             txt = add_md("~~", paragraphs[i])
 | |
|         elif key == "quoted":
 | |
|             txt = ">" + paragraphs[i]
 | |
|         elif key == "quote-block":
 | |
|             txt = paragraphs[i] + "\n" + next(gens["quote-blocks"])
 | |
|         elif key == "inline-code":
 | |
|             txt = paragraphs[i] + "\n" + next(gens["inline-code"])
 | |
|         elif key == "code-block":
 | |
|             txt = paragraphs[i] + "\n" + next(gens["code-blocks"])
 | |
|         elif key == "math":
 | |
|             txt = paragraphs[i] + "\n" + next(gens["maths"])
 | |
|         elif key == "list":
 | |
|             txt = paragraphs[i] + "\n" + next(gens["lists"])
 | |
|         elif key == "emoji":
 | |
|             txt = add_emoji(paragraphs[i], next(gens["emojis"]))
 | |
|         elif key == "link":
 | |
|             txt = add_link(paragraphs[i], next(gens["links"]))
 | |
|         elif key == "images":
 | |
|             # Ideally, this would actually be a 2-step process that
 | |
|             # first hits the `upload` endpoint and then adds that URL;
 | |
|             # this is the hacky version where we just use inline image
 | |
|             # previews of files already in the project (which are the
 | |
|             # only files we can link to as being definitely available
 | |
|             # even when developing offline).
 | |
|             txt = paragraphs[i] + "\n" + next(gens["images"])
 | |
| 
 | |
|         results.append(txt)
 | |
| 
 | |
|     return results
 | |
| 
 | |
| 
 | |
| def add_md(mode: str, text: str) -> str:
 | |
|     # mode means: bold, italic, etc.
 | |
|     # to add a list at the end of a paragraph, * item one\n * item two
 | |
| 
 | |
|     # find out how long the line is, then insert the mode before the end
 | |
| 
 | |
|     vals = text.split()
 | |
|     start = random.randrange(len(vals))
 | |
|     end = random.randrange(len(vals) - start) + start
 | |
|     vals[start] = mode + vals[start]
 | |
|     vals[end] += mode
 | |
| 
 | |
|     return " ".join(vals).strip()
 | |
| 
 | |
| 
 | |
| def add_emoji(text: str, emoji: str) -> str:
 | |
|     vals = text.split()
 | |
|     start = random.randrange(len(vals))
 | |
| 
 | |
|     vals[start] = vals[start] + " " + emoji + " "
 | |
|     return " ".join(vals)
 | |
| 
 | |
| 
 | |
| def add_link(text: str, link: str) -> str:
 | |
|     vals = text.split()
 | |
|     start = random.randrange(len(vals))
 | |
| 
 | |
|     vals[start] = vals[start] + " " + link + " "
 | |
| 
 | |
|     return " ".join(vals)
 | |
| 
 | |
| 
 | |
| def remove_line_breaks(fh: Any) -> list[str]:
 | |
|     # We're going to remove line breaks from paragraphs
 | |
|     results = []  # save the dialogs as tuples with (author, dialog)
 | |
| 
 | |
|     para = []  # we'll store the lines here to form a paragraph
 | |
| 
 | |
|     for line in fh:
 | |
|         text = line.strip()
 | |
|         if text != "":
 | |
|             para.append(text)
 | |
|         else:
 | |
|             if para:
 | |
|                 results.append(" ".join(para))
 | |
|             # reset the paragraph
 | |
|             para = []
 | |
|     if para:
 | |
|         results.append(" ".join(para))
 | |
| 
 | |
|     return results
 | |
| 
 | |
| 
 | |
| def write_file(paragraphs: list[str], filename: str) -> None:
 | |
|     with open(filename, "wb") as outfile:
 | |
|         outfile.write(orjson.dumps(paragraphs))
 | |
| 
 | |
| 
 | |
| def create_test_data() -> None:
 | |
|     gens = load_generators(config)  # returns a dictionary of generators
 | |
| 
 | |
|     paragraphs = parse_file(config, gens, config["corpus"]["filename"])
 | |
| 
 | |
|     write_file(
 | |
|         paragraphs,
 | |
|         os.path.join(get_or_create_dev_uuid_var_path("test-backend"), "test_messages.json"),
 | |
|     )
 | |
| 
 | |
| 
 | |
| config = load_config()
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     create_test_data()
 |