export: Add lib.utils.query_chunker().

This commit is contained in:
Steve Howell
2016-08-14 09:33:29 -07:00
committed by Tim Abbott
parent fd6ee7117f
commit cd2e36d66f
2 changed files with 201 additions and 2 deletions

View File

@@ -2,11 +2,13 @@
from __future__ import absolute_import
from __future__ import division
from typing import Any, Callable, Optional, Sequence, TypeVar
from typing import Any, Callable, Optional, Sequence, TypeVar, Iterable, Tuple
from six import text_type, binary_type
import base64
import errno
import hashlib
import heapq
import itertools
import os
from time import sleep
@@ -125,3 +127,60 @@ def mkdir_p(path):
pass
else:
raise
def query_chunker(queries, id_collector=None, chunk_size=1000, db_chunk_size=None):
# type: (List[Any], Set[int], int, int) -> Iterable[Any]
'''
This merges one or more Django ascending-id queries into
a generator that returns chunks of chunk_size row objects
during each yield, preserving id order across all results..
Queries should satisfy these conditions:
- They should be Django filters.
- They should return Django objects with "id" attributes.
- They should be disjoint.
The generator also populates id_collector, which we use
internally to enforce unique ids, but which the caller
can pass in to us if they want the side effect of collecting
all ids.
'''
if db_chunk_size is None:
db_chunk_size = chunk_size // len(queries)
assert db_chunk_size >= 2
assert chunk_size >= 2
if id_collector is not None:
assert(len(id_collector) == 0)
else:
id_collector = set()
def chunkify(q, i):
# type: (Any, int) -> Iterable[Tuple[int, int, Any]]
q = q.order_by('id')
min_id = -1
while True:
rows = list(q.filter(id__gt=min_id)[0:db_chunk_size])
if len(rows) == 0:
break
for row in rows:
yield (row.id, i, row)
min_id = rows[-1].id
iterators = [chunkify(q, i) for i, q in enumerate(queries)]
merged_query = heapq.merge(*iterators)
while True:
tup_chunk = list(itertools.islice(merged_query, 0, chunk_size))
if len(tup_chunk) == 0:
break
# Do duplicate-id management here.
tup_ids = set([tup[0] for tup in tup_chunk])
assert len(tup_ids) == len(tup_chunk)
assert len(tup_ids.intersection(id_collector)) == 0
id_collector.update(tup_ids)
yield [row for row_id, i, row in tup_chunk]