Coverage for fastblocks / actions / query / parser.py: 14%

199 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 03:58 -0800

1"""Universal Query Parser for FastBlocks. 

2 

3Converts HTTP request query parameters into ACB universal database queries. 

4Provides automatic filtering, pagination, sorting, and model lookup capabilities. 

5 

6Author: lesleslie <les@wedgwoodwebworks.com> 

7Created: 2025-01-13 

8""" 

9 

10import typing as t 

11from contextlib import suppress 

12 

13from acb.debug import debug 

14from acb.depends import Inject, depends 

15from starlette.requests import Request 

16from fastblocks.htmx import HtmxRequest 

17 

18 

19class UniversalQueryParser: 

20 @depends.inject 

21 def __init__( 

22 self, 

23 request: HtmxRequest | Request, 

24 query: Inject[t.Any], 

25 model_class: t.Any = None, 

26 pattern: str = "advanced", 

27 default_limit: int = 10, 

28 max_limit: int = 100, 

29 ) -> None: 

30 self.request = request 

31 self.model_class = model_class 

32 self.pattern = pattern 

33 self.default_limit = default_limit 

34 self.max_limit = max_limit 

35 self.query = query 

36 

37 def _parse_pagination(self, params: dict[str, str]) -> tuple[int, int, int]: 

38 page = max(1, int(params.pop("page", 1))) 

39 limit = min( 

40 self.max_limit, max(1, int(params.pop("limit", self.default_limit))) 

41 ) 

42 offset = (page - 1) * limit 

43 debug(f"Pagination: page={page}, limit={limit}, offset={offset}") 

44 return page, limit, offset 

45 

46 def _parse_sorting(self, params: dict[str, str]) -> tuple[str | None, str]: 

47 order_by = params.pop("order_by", None) 

48 order_dir = params.pop("order_dir", "asc").lower() 

49 if order_dir not in ("asc", "desc"): 

50 order_dir = "asc" 

51 debug(f"Sorting: order_by={order_by}, order_dir={order_dir}") 

52 return order_by, order_dir 

53 

54 def _parse_filters(self, params: dict[str, str]) -> list[tuple[str, str, t.Any]]: 

55 filters = [] 

56 for key, value in params.items(): 

57 if "__" in key: 

58 field, operator = key.rsplit("__", 1) 

59 processed_value = self._process_operator_value(operator, value) 

60 filters.append((field, operator, processed_value)) 

61 else: 

62 processed_value = self._process_simple_value(value) 

63 filters.append((key, "equals", processed_value)) 

64 debug(f"Filters: {filters}") 

65 return filters 

66 

67 def _process_operator_value(self, operator: str, value: str) -> t.Any: 

68 if operator == "null": 

69 return value.lower() in ("true", "1", "yes") 

70 elif operator == "in": 

71 return [v.strip() for v in value.split(",")] 

72 elif operator in ("gt", "gte", "lt", "lte"): 

73 return self._convert_to_number(value) 

74 return value 

75 

76 def _process_simple_value(self, value: str) -> t.Any: 

77 if value.lower() in ("true", "false"): 

78 return value.lower() == "true" 

79 elif value.lower() in ("null", "none"): 

80 return None 

81 return self._convert_to_number(value) 

82 

83 def _convert_to_number(self, value: str) -> t.Any: 

84 with suppress(ValueError): 

85 if "." in value: 

86 return float(value) 

87 

88 return int(value) 

89 return value 

90 

91 def _apply_filters( # noqa: C901 

92 self, query_builder: t.Any, filters: list[tuple[str, str, t.Any]] 

93 ) -> t.Any: 

94 for field, operator, value in filters: 

95 try: 

96 if operator == "equals": 

97 query_builder = query_builder.where(field, value) 

98 elif operator == "gt": 

99 query_builder = query_builder.where_gt(field, value) 

100 elif operator == "gte": 

101 query_builder = query_builder.where_gte(field, value) 

102 elif operator == "lt": 

103 query_builder = query_builder.where_lt(field, value) 

104 elif operator == "lte": 

105 query_builder = query_builder.where_lte(field, value) 

106 elif operator == "contains": 

107 query_builder = query_builder.where_like(field, f"%{value}%") 

108 elif operator == "icontains": 

109 query_builder = query_builder.where_ilike(field, f"%{value}%") 

110 elif operator == "in": 

111 query_builder = query_builder.where_in(field, value) 

112 elif operator == "not": 

113 query_builder = query_builder.where_not(field, value) 

114 elif operator == "null": 

115 if value: 

116 query_builder = query_builder.where_null(field) 

117 else: 

118 query_builder = query_builder.where_not_null(field) 

119 else: 

120 debug( 

121 f"Unknown operator '{operator}' for field '{field}', skipping" 

122 ) 

123 except AttributeError as e: 

124 debug( 

125 f"Query builder method not available for operator '{operator}': {e}" 

126 ) 

127 

128 return query_builder 

129 

130 def _apply_sorting( 

131 self, query_builder: t.Any, order_by: str | None, order_dir: str 

132 ) -> t.Any: 

133 if order_by: 

134 try: 

135 if order_dir == "desc": 

136 query_builder = query_builder.order_by_desc(order_by) 

137 else: 

138 query_builder = query_builder.order_by(order_by) 

139 except AttributeError as e: 

140 debug(f"Query builder sorting method not available: {e}") 

141 

142 return query_builder 

143 

144 def _apply_pagination(self, query_builder: t.Any, offset: int, limit: int) -> t.Any: 

145 try: 

146 return query_builder.offset(offset).limit(limit) 

147 except AttributeError as e: 

148 debug(f"Query builder pagination method not available: {e}") 

149 return query_builder 

150 

151 async def parse_and_execute(self) -> list[t.Any]: 

152 if not self._validate_query_requirements(): 

153 return [] 

154 params = dict(getattr(self.request, "query_params", {})) 

155 debug(f"Original query params: {params}") 

156 _, limit, offset = self._parse_pagination(params) 

157 order_by, order_dir = self._parse_sorting(params) 

158 filters = self._parse_filters(params) 

159 try: 

160 query_builder = self._get_query_builder(filters) 

161 if query_builder is None: 

162 return [] 

163 

164 return await self._execute_query( 

165 query_builder, filters, order_by, order_dir, offset, limit 

166 ) 

167 except Exception as e: 

168 debug(f"Query execution failed: {e}") 

169 return [] 

170 

171 def _validate_query_requirements(self) -> bool: 

172 if not self.model_class: 

173 debug("No model class provided for query parsing") 

174 return False 

175 if not self.query: 

176 debug("Universal query interface not available") 

177 return False 

178 return True 

179 

180 def _get_query_builder(self, filters: list[tuple[str, str, t.Any]]) -> t.Any: 

181 if self.pattern == "simple": 

182 return self._handle_simple_pattern(filters) 

183 elif self.pattern in ("repository", "specification"): 

184 debug( 

185 f"{self.pattern.title()} pattern not fully implemented, falling back to advanced" 

186 ) 

187 return self.query.for_model(self.model_class).advanced 

188 return self.query.for_model(self.model_class).advanced 

189 

190 def _handle_simple_pattern(self, filters: list[tuple[str, str, t.Any]]) -> t.Any: 

191 query_builder = self.query.for_model(self.model_class).simple 

192 if filters: 

193 for field, operator, value in filters: 

194 if operator == "equals": 

195 try: 

196 query_builder = query_builder.where(field, value) 

197 except AttributeError: 

198 debug("Simple query pattern doesn't support where clause") 

199 break 

200 return query_builder 

201 

202 async def _execute_query( 

203 self, 

204 query_builder: t.Any, 

205 filters: list[tuple[str, str, t.Any]], 

206 order_by: str | None, 

207 order_dir: str, 

208 offset: int, 

209 limit: int, 

210 ) -> list[t.Any]: 

211 if self.pattern == "simple": 

212 return t.cast(list[t.Any], await query_builder.all()) 

213 

214 query_builder = self._apply_filters(query_builder, filters) 

215 query_builder = self._apply_sorting(query_builder, order_by, order_dir) 

216 query_builder = self._apply_pagination(query_builder, offset, limit) 

217 

218 debug(f"Executing query for model {self.model_class.__name__}") 

219 results = t.cast(list[t.Any], await query_builder.all()) 

220 debug(f"Query returned {len(results)} results") 

221 

222 return results 

223 

224 async def get_count(self) -> int: 

225 if not self.model_class or not self.query: 

226 return 0 

227 params = dict(getattr(self.request, "query_params", {})) 

228 params.pop("page", None) 

229 params.pop("limit", None) 

230 params.pop("order_by", None) 

231 params.pop("order_dir", None) 

232 filters = self._parse_filters(params) 

233 try: 

234 query_builder = self.query.for_model(self.model_class).advanced 

235 query_builder = self._apply_filters(query_builder, filters) 

236 

237 return t.cast(int, await query_builder.count()) 

238 except Exception as e: 

239 debug(f"Count query failed: {e}") 

240 return 0 

241 

242 def get_pagination_info(self) -> dict[str, t.Any]: 

243 params = dict(getattr(self.request, "query_params", {})) 

244 page, limit, offset = self._parse_pagination(params) 

245 

246 return { 

247 "page": page, 

248 "limit": limit, 

249 "offset": offset, 

250 "has_prev": page > 1, 

251 "prev_page": page - 1 if page > 1 else None, 

252 "next_page": page + 1, 

253 } 

254 

255 

256async def get_model_for_query(model_name: str) -> t.Any | None: 

257 try: 

258 models = await depends.get("models") 

259 if models and hasattr(models, model_name): 

260 return getattr(models, model_name) 

261 except Exception as e: 

262 debug(f"Failed to get model '{model_name}': {e}") 

263 

264 return None 

265 

266 

267async def create_query_context( 

268 request: HtmxRequest | Request, 

269 model_name: str | None = None, 

270 base_context: dict[str, t.Any] | None = None, 

271) -> dict[str, t.Any]: 

272 if base_context is None: 

273 base_context = {} 

274 

275 context = dict(base_context) 

276 

277 if not model_name: 

278 query_params = getattr(request, "query_params", {}) 

279 model_name = query_params.get("model") 

280 

281 if not model_name: 

282 return context 

283 

284 model_class = await get_model_for_query(model_name) 

285 if not model_class: 

286 debug(f"Model '{model_name}' not found") 

287 return context 

288 

289 parser = UniversalQueryParser(request, model_class) 

290 

291 context.update( 

292 { 

293 f"{model_name}_parser": parser, 

294 f"{model_name}_pagination": parser.get_pagination_info(), 

295 "universal_query": { 

296 "model_name": model_name, 

297 "model_class": model_class, 

298 "parser": parser, 

299 }, 

300 } 

301 ) 

302 

303 return context