export: Support streaming iterators when writing JSON.

This commit is contained in:
Alex Vandiver
2025-08-25 14:58:23 +00:00
committed by Tim Abbott
parent 78bc17ecbe
commit 67743d150a

View File

@@ -15,7 +15,7 @@ import secrets
import shutil
import subprocess
import tempfile
from collections.abc import Callable, Iterable, Mapping
from collections.abc import Callable, Iterable, Iterator, Mapping
from datetime import datetime
from email.headerregistry import Address
from functools import cache
@@ -117,6 +117,7 @@ class MessagePartial(TypedDict):
realm_id: int
ORJSON_ITERABLE_BATCH_SIZE = 1000
MESSAGE_BATCH_CHUNK_SIZE = 1000
ALL_ZULIP_TABLES = {
@@ -414,6 +415,43 @@ def sanity_check_output(data: TableData) -> None:
logging.warning("??? NO DATA EXPORTED FOR TABLE %s!!!", table)
def orjson_stream(
it: Iterable[Any],
options: int = orjson.OPT_INDENT_2,
indent: bytes = b"",
chunk_size: int = ORJSON_ITERABLE_BATCH_SIZE,
) -> Iterator[bytes]:
first_chunk = True
for batch in batched(it, chunk_size):
if not first_chunk:
yield b",\n" + indent
chunk = orjson.dumps(batch, option=options)
if not first_chunk:
assert chunk.startswith(b"[\n")
chunk = chunk[2:]
assert chunk.endswith(b"\n]")
chunk = chunk[:-2]
if indent != b"":
chunk = chunk.replace(b"\n", b"\n" + indent)
yield chunk
first_chunk = False
if first_chunk:
yield b"[]"
else:
yield b"\n" + indent + b"]"
def orjson_serialize_iterable(
obj: Any, options: int = orjson.OPT_INDENT_2, indent: bytes = b""
) -> orjson.Fragment:
if not isinstance(obj, Iterator):
raise TypeError
serialized = bytearray()
for byte_section in orjson_stream(obj, options, indent):
serialized.extend(byte_section)
return orjson.Fragment(bytes(serialized))
def write_data_to_file(output_file: Path, data: Any) -> None:
"""
IMPORTANT: You generally don't want to call this directly.
@@ -431,7 +469,14 @@ def write_data_to_file(output_file: Path, data: Any) -> None:
# is what we want, because it helps us check that we correctly
# post-processed them to serialize to UNIX timestamps rather than ISO
# 8601 strings for historical reasons.
f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_PASSTHROUGH_DATETIME))
options = orjson.OPT_INDENT_2 | orjson.OPT_PASSTHROUGH_DATETIME
f.write(
orjson.dumps(
data,
option=options,
default=lambda d: orjson_serialize_iterable(d, options, indent=b" "),
)
)
logging.info("Finished writing %s", output_file)
@@ -448,7 +493,7 @@ def write_table_data(output_file: str, data: dict[str, Any]) -> None:
write_data_to_file(output_file, data)
def write_records_json_file(output_dir: str, records: list[dict[str, Any]]) -> None:
def write_records_json_file(output_dir: str, records: Iterable[dict[str, Any]]) -> None:
# We want a somewhat deterministic sorting order here. All of our
# versions of records.json include a "path" field in each element,
# even though there's some variation among avatars/emoji/realm_icons/uploads
@@ -457,13 +502,17 @@ def write_records_json_file(output_dir: str, records: list[dict[str, Any]]) -> N
# The sorting order of paths isn't entirely sensical to humans,
# because they include ids and even some random numbers,
# but if you export the same realm twice, you should get identical results.
records.sort(key=lambda record: record["path"])
#
# If this is an iterator, it is promised to be already sorted.
# For lists, we sort by that here.
if isinstance(records, list):
records.sort(key=lambda record: record["path"])
output_file = os.path.join(output_dir, "records.json")
with open(output_file, "wb") as f:
# For legacy reasons we allow datetime objects here, unlike
# write_data_to_file.
f.write(orjson.dumps(records, option=orjson.OPT_INDENT_2))
f.writelines(orjson_stream(records))
logging.info("Finished writing %s", output_file)