Coverage for fastblocks / actions / query / parser.py: 14%
199 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:58 -0800
1"""Universal Query Parser for FastBlocks.
3Converts HTTP request query parameters into ACB universal database queries.
4Provides automatic filtering, pagination, sorting, and model lookup capabilities.
6Author: lesleslie <les@wedgwoodwebworks.com>
7Created: 2025-01-13
8"""
10import typing as t
11from contextlib import suppress
13from acb.debug import debug
14from acb.depends import Inject, depends
15from starlette.requests import Request
16from fastblocks.htmx import HtmxRequest
19class UniversalQueryParser:
20 @depends.inject
21 def __init__(
22 self,
23 request: HtmxRequest | Request,
24 query: Inject[t.Any],
25 model_class: t.Any = None,
26 pattern: str = "advanced",
27 default_limit: int = 10,
28 max_limit: int = 100,
29 ) -> None:
30 self.request = request
31 self.model_class = model_class
32 self.pattern = pattern
33 self.default_limit = default_limit
34 self.max_limit = max_limit
35 self.query = query
37 def _parse_pagination(self, params: dict[str, str]) -> tuple[int, int, int]:
38 page = max(1, int(params.pop("page", 1)))
39 limit = min(
40 self.max_limit, max(1, int(params.pop("limit", self.default_limit)))
41 )
42 offset = (page - 1) * limit
43 debug(f"Pagination: page={page}, limit={limit}, offset={offset}")
44 return page, limit, offset
46 def _parse_sorting(self, params: dict[str, str]) -> tuple[str | None, str]:
47 order_by = params.pop("order_by", None)
48 order_dir = params.pop("order_dir", "asc").lower()
49 if order_dir not in ("asc", "desc"):
50 order_dir = "asc"
51 debug(f"Sorting: order_by={order_by}, order_dir={order_dir}")
52 return order_by, order_dir
54 def _parse_filters(self, params: dict[str, str]) -> list[tuple[str, str, t.Any]]:
55 filters = []
56 for key, value in params.items():
57 if "__" in key:
58 field, operator = key.rsplit("__", 1)
59 processed_value = self._process_operator_value(operator, value)
60 filters.append((field, operator, processed_value))
61 else:
62 processed_value = self._process_simple_value(value)
63 filters.append((key, "equals", processed_value))
64 debug(f"Filters: {filters}")
65 return filters
67 def _process_operator_value(self, operator: str, value: str) -> t.Any:
68 if operator == "null":
69 return value.lower() in ("true", "1", "yes")
70 elif operator == "in":
71 return [v.strip() for v in value.split(",")]
72 elif operator in ("gt", "gte", "lt", "lte"):
73 return self._convert_to_number(value)
74 return value
76 def _process_simple_value(self, value: str) -> t.Any:
77 if value.lower() in ("true", "false"):
78 return value.lower() == "true"
79 elif value.lower() in ("null", "none"):
80 return None
81 return self._convert_to_number(value)
83 def _convert_to_number(self, value: str) -> t.Any:
84 with suppress(ValueError):
85 if "." in value:
86 return float(value)
88 return int(value)
89 return value
91 def _apply_filters( # noqa: C901
92 self, query_builder: t.Any, filters: list[tuple[str, str, t.Any]]
93 ) -> t.Any:
94 for field, operator, value in filters:
95 try:
96 if operator == "equals":
97 query_builder = query_builder.where(field, value)
98 elif operator == "gt":
99 query_builder = query_builder.where_gt(field, value)
100 elif operator == "gte":
101 query_builder = query_builder.where_gte(field, value)
102 elif operator == "lt":
103 query_builder = query_builder.where_lt(field, value)
104 elif operator == "lte":
105 query_builder = query_builder.where_lte(field, value)
106 elif operator == "contains":
107 query_builder = query_builder.where_like(field, f"%{value}%")
108 elif operator == "icontains":
109 query_builder = query_builder.where_ilike(field, f"%{value}%")
110 elif operator == "in":
111 query_builder = query_builder.where_in(field, value)
112 elif operator == "not":
113 query_builder = query_builder.where_not(field, value)
114 elif operator == "null":
115 if value:
116 query_builder = query_builder.where_null(field)
117 else:
118 query_builder = query_builder.where_not_null(field)
119 else:
120 debug(
121 f"Unknown operator '{operator}' for field '{field}', skipping"
122 )
123 except AttributeError as e:
124 debug(
125 f"Query builder method not available for operator '{operator}': {e}"
126 )
128 return query_builder
130 def _apply_sorting(
131 self, query_builder: t.Any, order_by: str | None, order_dir: str
132 ) -> t.Any:
133 if order_by:
134 try:
135 if order_dir == "desc":
136 query_builder = query_builder.order_by_desc(order_by)
137 else:
138 query_builder = query_builder.order_by(order_by)
139 except AttributeError as e:
140 debug(f"Query builder sorting method not available: {e}")
142 return query_builder
144 def _apply_pagination(self, query_builder: t.Any, offset: int, limit: int) -> t.Any:
145 try:
146 return query_builder.offset(offset).limit(limit)
147 except AttributeError as e:
148 debug(f"Query builder pagination method not available: {e}")
149 return query_builder
151 async def parse_and_execute(self) -> list[t.Any]:
152 if not self._validate_query_requirements():
153 return []
154 params = dict(getattr(self.request, "query_params", {}))
155 debug(f"Original query params: {params}")
156 _, limit, offset = self._parse_pagination(params)
157 order_by, order_dir = self._parse_sorting(params)
158 filters = self._parse_filters(params)
159 try:
160 query_builder = self._get_query_builder(filters)
161 if query_builder is None:
162 return []
164 return await self._execute_query(
165 query_builder, filters, order_by, order_dir, offset, limit
166 )
167 except Exception as e:
168 debug(f"Query execution failed: {e}")
169 return []
171 def _validate_query_requirements(self) -> bool:
172 if not self.model_class:
173 debug("No model class provided for query parsing")
174 return False
175 if not self.query:
176 debug("Universal query interface not available")
177 return False
178 return True
180 def _get_query_builder(self, filters: list[tuple[str, str, t.Any]]) -> t.Any:
181 if self.pattern == "simple":
182 return self._handle_simple_pattern(filters)
183 elif self.pattern in ("repository", "specification"):
184 debug(
185 f"{self.pattern.title()} pattern not fully implemented, falling back to advanced"
186 )
187 return self.query.for_model(self.model_class).advanced
188 return self.query.for_model(self.model_class).advanced
190 def _handle_simple_pattern(self, filters: list[tuple[str, str, t.Any]]) -> t.Any:
191 query_builder = self.query.for_model(self.model_class).simple
192 if filters:
193 for field, operator, value in filters:
194 if operator == "equals":
195 try:
196 query_builder = query_builder.where(field, value)
197 except AttributeError:
198 debug("Simple query pattern doesn't support where clause")
199 break
200 return query_builder
202 async def _execute_query(
203 self,
204 query_builder: t.Any,
205 filters: list[tuple[str, str, t.Any]],
206 order_by: str | None,
207 order_dir: str,
208 offset: int,
209 limit: int,
210 ) -> list[t.Any]:
211 if self.pattern == "simple":
212 return t.cast(list[t.Any], await query_builder.all())
214 query_builder = self._apply_filters(query_builder, filters)
215 query_builder = self._apply_sorting(query_builder, order_by, order_dir)
216 query_builder = self._apply_pagination(query_builder, offset, limit)
218 debug(f"Executing query for model {self.model_class.__name__}")
219 results = t.cast(list[t.Any], await query_builder.all())
220 debug(f"Query returned {len(results)} results")
222 return results
224 async def get_count(self) -> int:
225 if not self.model_class or not self.query:
226 return 0
227 params = dict(getattr(self.request, "query_params", {}))
228 params.pop("page", None)
229 params.pop("limit", None)
230 params.pop("order_by", None)
231 params.pop("order_dir", None)
232 filters = self._parse_filters(params)
233 try:
234 query_builder = self.query.for_model(self.model_class).advanced
235 query_builder = self._apply_filters(query_builder, filters)
237 return t.cast(int, await query_builder.count())
238 except Exception as e:
239 debug(f"Count query failed: {e}")
240 return 0
242 def get_pagination_info(self) -> dict[str, t.Any]:
243 params = dict(getattr(self.request, "query_params", {}))
244 page, limit, offset = self._parse_pagination(params)
246 return {
247 "page": page,
248 "limit": limit,
249 "offset": offset,
250 "has_prev": page > 1,
251 "prev_page": page - 1 if page > 1 else None,
252 "next_page": page + 1,
253 }
256async def get_model_for_query(model_name: str) -> t.Any | None:
257 try:
258 models = await depends.get("models")
259 if models and hasattr(models, model_name):
260 return getattr(models, model_name)
261 except Exception as e:
262 debug(f"Failed to get model '{model_name}': {e}")
264 return None
267async def create_query_context(
268 request: HtmxRequest | Request,
269 model_name: str | None = None,
270 base_context: dict[str, t.Any] | None = None,
271) -> dict[str, t.Any]:
272 if base_context is None:
273 base_context = {}
275 context = dict(base_context)
277 if not model_name:
278 query_params = getattr(request, "query_params", {})
279 model_name = query_params.get("model")
281 if not model_name:
282 return context
284 model_class = await get_model_for_query(model_name)
285 if not model_class:
286 debug(f"Model '{model_name}' not found")
287 return context
289 parser = UniversalQueryParser(request, model_class)
291 context.update(
292 {
293 f"{model_name}_parser": parser,
294 f"{model_name}_pagination": parser.get_pagination_info(),
295 "universal_query": {
296 "model_name": model_name,
297 "model_class": model_class,
298 "parser": parser,
299 },
300 }
301 )
303 return context