mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-30 19:43:47 +00:00 
			
		
		
		
	Effectively revert commit b4cf9ad777 to
work around https://code.djangoproject.com/ticket/34466.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
		
	
		
			
				
	
	
		
			105 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			105 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import time
 | |
| from typing import (
 | |
|     Any,
 | |
|     Callable,
 | |
|     Dict,
 | |
|     Iterable,
 | |
|     List,
 | |
|     Mapping,
 | |
|     Optional,
 | |
|     Sequence,
 | |
|     TypeVar,
 | |
|     Union,
 | |
|     overload,
 | |
| )
 | |
| 
 | |
| from psycopg2.extensions import connection, cursor
 | |
| from psycopg2.sql import Composable
 | |
| 
 | |
| CursorObj = TypeVar("CursorObj", bound=cursor)
 | |
| Query = Union[str, bytes, Composable]
 | |
| Params = Union[Sequence[object], Mapping[str, object], None]
 | |
| ParamsT = TypeVar("ParamsT")
 | |
| 
 | |
| 
 | |
| # Similar to the tracking done in Django's CursorDebugWrapper, but done at the
 | |
| # psycopg2 cursor level so it works with SQLAlchemy.
 | |
| def wrapper_execute(
 | |
|     self: CursorObj, action: Callable[[Query, ParamsT], None], sql: Query, params: ParamsT
 | |
| ) -> None:
 | |
|     start = time.time()
 | |
|     try:
 | |
|         action(sql, params)
 | |
|     finally:
 | |
|         stop = time.time()
 | |
|         duration = stop - start
 | |
|         self.connection.queries.append(
 | |
|             {
 | |
|                 "time": f"{duration:.3f}",
 | |
|             }
 | |
|         )
 | |
| 
 | |
| 
 | |
| class TimeTrackingCursor(cursor):
 | |
|     """A psycopg2 cursor class that tracks the time spent executing queries."""
 | |
| 
 | |
|     def execute(self, query: Query, vars: Params = None) -> None:
 | |
|         wrapper_execute(self, super().execute, query, vars)
 | |
| 
 | |
|     def executemany(self, query: Query, vars: Iterable[Params]) -> None:  # nocoverage
 | |
|         wrapper_execute(self, super().executemany, query, vars)
 | |
| 
 | |
| 
 | |
| CursorT = TypeVar("CursorT", bound=cursor)
 | |
| 
 | |
| 
 | |
| class TimeTrackingConnection(connection):
 | |
|     """A psycopg2 connection class that uses TimeTrackingCursors."""
 | |
| 
 | |
|     def __init__(self, *args: Any, **kwargs: Any) -> None:
 | |
|         self.queries: List[Dict[str, str]] = []
 | |
|         super().__init__(*args, **kwargs)
 | |
| 
 | |
|     @overload
 | |
|     def cursor(
 | |
|         self,
 | |
|         name: Union[str, bytes, None] = ...,
 | |
|         *,
 | |
|         withhold: bool = ...,
 | |
|         scrollable: Optional[bool] = ...,
 | |
|     ) -> TimeTrackingCursor:
 | |
|         ...
 | |
| 
 | |
|     @overload
 | |
|     def cursor(
 | |
|         self,
 | |
|         name: Union[str, bytes, None] = ...,
 | |
|         *,
 | |
|         cursor_factory: Callable[..., CursorT] = ...,
 | |
|         withhold: bool = ...,
 | |
|         scrollable: Optional[bool] = ...,
 | |
|     ) -> CursorT:
 | |
|         ...
 | |
| 
 | |
|     @overload
 | |
|     def cursor(
 | |
|         self,
 | |
|         name: Union[str, bytes, None],
 | |
|         cursor_factory: Callable[..., CursorT] = ...,
 | |
|         withhold: bool = ...,
 | |
|         scrollable: Optional[bool] = ...,
 | |
|     ) -> CursorT:
 | |
|         ...
 | |
| 
 | |
|     def cursor(self, *args: Any, **kwargs: Any) -> cursor:
 | |
|         kwargs["cursor_factory"] = TimeTrackingCursor
 | |
|         return super().cursor(*args, **kwargs)
 | |
| 
 | |
| 
 | |
| def reset_queries() -> None:
 | |
|     from django.db import connections
 | |
| 
 | |
|     for conn in connections.all():
 | |
|         if conn.connection is not None:
 | |
|             conn.connection.queries = []
 |