Coverage for src / dataknobs_data / backends / duckdb.py: 15%
468 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""DuckDB backend implementation for analytical workloads.
3DuckDB is an embedded columnar database optimized for analytics,
4providing 10-100x performance improvement over SQLite for
5aggregations, joins, and analytical queries.
6"""
8from __future__ import annotations
10import asyncio
11import logging
12import threading
13from concurrent.futures import ThreadPoolExecutor
14from pathlib import Path
15from typing import Any, TYPE_CHECKING
17import duckdb
18from dataknobs_config import ConfigurableBase
20from ..database import AsyncDatabase, SyncDatabase
21from ..query import Query
22from ..query_logic import ComplexQuery
23from .sql_base import SQLQueryBuilder, SQLRecordSerializer, SQLTableManager
25if TYPE_CHECKING:
26 from collections.abc import AsyncIterator, Iterator
27 from ..records import Record
28 from ..streaming import StreamConfig, StreamResult
31logger = logging.getLogger(__name__)
34class AsyncDuckDBDatabase(AsyncDatabase, ConfigurableBase): # type: ignore[misc]
35 """Asynchronous DuckDB database backend for analytical workloads.
37 DuckDB is an embedded columnar database optimized for analytics.
38 Provides 10-100x performance improvement over SQLite for
39 aggregations, joins, and analytical queries.
41 Features:
42 - Columnar storage for fast analytical queries
43 - Parallel execution for multi-threaded query processing
44 - Native Parquet integration for efficient data import/export
45 - Advanced analytics support (window functions, CTEs, complex aggregations)
47 Usage:
48 ```python
49 from dataknobs_data import async_database_factory
51 # File-based database
52 db = async_database_factory("duckdb:///path/to/data.duckdb")
54 # In-memory database
55 db = async_database_factory("duckdb:///:memory:")
57 async with db:
58 # Perform CRUD operations
59 await db.create(record)
60 results = await db.search(query)
61 ```
62 """
64 def __init__(self, config: dict[str, Any] | None = None):
65 """Initialize async DuckDB database.
67 Args:
68 config: Configuration with the following optional keys:
69 - path: Database file path (default: ":memory:")
70 - table: Table name (default: "records")
71 - timeout: Connection timeout in seconds (default: 5.0)
72 - max_workers: Number of threads in pool (default: 4)
73 - read_only: Open database in read-only mode (default: False)
74 """
75 super().__init__(config)
76 config = config or {}
77 self.db_path = config.get("path", ":memory:")
78 self.table_name = config.get("table", "records")
79 self.timeout = config.get("timeout", 5.0)
80 self.max_workers = config.get("max_workers", 4)
81 self.read_only = config.get("read_only", False)
83 # Thread pool for async operations (DuckDB has no native async support)
84 self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
86 # Reuse SQL infrastructure
87 self.query_builder = SQLQueryBuilder(
88 self.table_name,
89 dialect="duckdb",
90 param_style="qmark" # DuckDB uses ? placeholders
91 )
92 self.serializer = SQLRecordSerializer()
93 self.table_manager = SQLTableManager(
94 self.table_name,
95 dialect="duckdb"
96 )
98 self.conn: duckdb.DuckDBPyConnection | None = None
99 self._connected = False
100 self._lock = threading.Lock() # Thread safety lock for DuckDB connection
102 @classmethod
103 def from_config(cls, config: dict) -> AsyncDuckDBDatabase:
104 """Create from config dictionary."""
105 return cls(config)
107 async def connect(self) -> None:
108 """Connect to the DuckDB database."""
109 if self._connected:
110 return
112 # Create directory if needed for file-based database
113 if self.db_path != ":memory:":
114 db_file = Path(self.db_path)
115 db_file.parent.mkdir(parents=True, exist_ok=True)
117 # Connect to database (in thread pool since DuckDB is sync)
118 loop = asyncio.get_event_loop()
119 self.conn = await loop.run_in_executor(
120 self.executor,
121 self._connect_sync
122 )
124 # Create table if it doesn't exist
125 await self._ensure_table()
127 self._connected = True
128 logger.info(f"Connected to async DuckDB database: {self.db_path}")
130 def _connect_sync(self) -> duckdb.DuckDBPyConnection:
131 """Synchronous connection helper."""
132 return duckdb.connect(
133 self.db_path,
134 read_only=self.read_only
135 )
137 async def close(self) -> None:
138 """Close the database connection."""
139 if self.conn:
140 loop = asyncio.get_event_loop()
141 await loop.run_in_executor(
142 self.executor,
143 self.conn.close
144 )
145 self.conn = None
146 self._connected = False
147 logger.info(f"Disconnected from async DuckDB database: {self.db_path}")
149 # Shutdown executor
150 self.executor.shutdown(wait=True)
152 async def _ensure_table(self) -> None:
153 """Ensure the table exists."""
154 if not self.conn:
155 raise RuntimeError("Database not connected. Call connect() first.")
157 loop = asyncio.get_event_loop()
158 await loop.run_in_executor(
159 self.executor,
160 self._ensure_table_sync
161 )
163 def _ensure_table_sync(self) -> None:
164 """Synchronous table creation."""
165 # Skip table creation in read-only mode
166 if self.read_only:
167 return
169 with self._lock:
170 create_sql = self.table_manager.get_create_table_sql()
171 self.conn.execute(create_sql)
173 def _check_connection(self) -> None:
174 """Check if database is connected."""
175 if not self._connected or not self.conn:
176 raise RuntimeError("Database not connected. Call connect() first.")
178 async def create(self, record: Record) -> str:
179 """Create a new record.
181 Args:
182 record: The record to create
184 Returns:
185 The record ID
186 """
187 self._check_connection()
189 loop = asyncio.get_event_loop()
190 return await loop.run_in_executor(
191 self.executor,
192 self._create_sync,
193 record
194 )
196 def _create_sync(self, record: Record) -> str:
197 """Synchronous create implementation."""
198 query, params = self.query_builder.build_create_query(record)
200 try:
201 with self._lock:
202 self.conn.execute(query, params)
203 # DuckDB doesn't support RETURNING, so we use the ID we generated
204 record_id = params[0] # ID is the first parameter
205 return record_id
206 except duckdb.ConstraintException as e:
207 raise ValueError(f"Record with ID {params[0]} already exists") from e
209 async def read(self, id: str) -> Record | None:
210 """Read a record by ID.
212 Args:
213 id: The record ID
215 Returns:
216 The record if found, None otherwise
217 """
218 self._check_connection()
220 loop = asyncio.get_event_loop()
221 return await loop.run_in_executor(
222 self.executor,
223 self._read_sync,
224 id
225 )
227 def _read_sync(self, id: str) -> Record | None:
228 """Synchronous read implementation."""
229 query, params = self.query_builder.build_read_query(id)
231 with self._lock:
232 result = self.conn.execute(query, params).fetchone()
234 if result:
235 # Convert tuple result to dict
236 columns = self.conn.description
237 row_dict = {columns[i][0]: result[i] for i in range(len(columns))}
238 return SQLQueryBuilder.row_to_record(row_dict)
239 return None
241 async def update(self, id: str, record: Record) -> bool:
242 """Update an existing record.
244 Args:
245 id: The record ID to update
246 record: The record data to update with
248 Returns:
249 True if the record was updated, False if no record exists
250 """
251 self._check_connection()
253 loop = asyncio.get_event_loop()
254 return await loop.run_in_executor(
255 self.executor,
256 self._update_sync,
257 id,
258 record
259 )
261 def _update_sync(self, id: str, record: Record) -> bool:
262 """Synchronous update implementation."""
263 query, params = self.query_builder.build_update_query(id, record)
265 with self._lock:
266 # Check if record exists
267 exists_query, exists_params = self.query_builder.build_exists_query(id)
268 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None
270 if exists:
271 self.conn.execute(query, params)
272 return True
274 logger.warning(f"Update affected 0 rows for id={id}. Record may not exist.")
275 return False
277 async def delete(self, id: str) -> bool:
278 """Delete a record by ID.
280 Args:
281 id: The record ID
283 Returns:
284 True if deleted, False if not found
285 """
286 self._check_connection()
288 loop = asyncio.get_event_loop()
289 return await loop.run_in_executor(
290 self.executor,
291 self._delete_sync,
292 id
293 )
295 def _delete_sync(self, id: str) -> bool:
296 """Synchronous delete implementation."""
297 query, params = self.query_builder.build_delete_query(id)
299 with self._lock:
300 # First check if the record exists
301 exists_query, exists_params = self.query_builder.build_exists_query(id)
302 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None
304 if exists:
305 self.conn.execute(query, params)
306 return True
307 return False
309 async def exists(self, id: str) -> bool:
310 """Check if a record exists.
312 Args:
313 id: The record ID
315 Returns:
316 True if exists, False otherwise
317 """
318 self._check_connection()
320 loop = asyncio.get_event_loop()
321 return await loop.run_in_executor(
322 self.executor,
323 self._exists_sync,
324 id
325 )
327 def _exists_sync(self, id: str) -> bool:
328 """Synchronous exists implementation."""
329 query, params = self.query_builder.build_exists_query(id)
331 with self._lock:
332 result = self.conn.execute(query, params).fetchone()
333 return result is not None
335 async def search(self, query: Query | ComplexQuery) -> list[Record]:
336 """Search for records matching a query.
338 Args:
339 query: The query specification
341 Returns:
342 List of matching records
343 """
344 self._check_connection()
346 loop = asyncio.get_event_loop()
347 return await loop.run_in_executor(
348 self.executor,
349 self._search_sync,
350 query
351 )
353 def _search_sync(self, query: Query | ComplexQuery) -> list[Record]:
354 """Synchronous search implementation."""
355 # Handle ComplexQuery with native SQL support
356 if isinstance(query, ComplexQuery):
357 sql_query, params = self.query_builder.build_complex_search_query(query)
358 else:
359 sql_query, params = self.query_builder.build_search_query(query)
361 with self._lock:
362 results = self.conn.execute(sql_query, params).fetchall()
363 columns = self.conn.description
365 records = []
366 for result in results:
367 # Convert tuple result to dict
368 row_dict = {columns[i][0]: result[i] for i in range(len(columns))}
369 record = SQLQueryBuilder.row_to_record(row_dict)
371 # Populate storage_id from database ID
372 record.storage_id = str(row_dict['id'])
374 records.append(record)
376 # Apply field projection if specified
377 if hasattr(query, 'fields') and query.fields:
378 records = [r.project(query.fields) for r in records]
380 return records
382 async def count(self, query: Query | None = None) -> int:
383 """Count records matching a query.
385 Args:
386 query: Optional query specification
388 Returns:
389 Count of matching records
390 """
391 self._check_connection()
393 loop = asyncio.get_event_loop()
394 return await loop.run_in_executor(
395 self.executor,
396 self._count_sync,
397 query
398 )
400 def _count_sync(self, query: Query | None = None) -> int:
401 """Synchronous count implementation."""
402 sql_query, params = self.query_builder.build_count_query(query)
404 with self._lock:
405 result = self.conn.execute(sql_query, params).fetchone()
406 return result[0] if result else 0
408 async def create_batch(self, records: list[Record]) -> list[str]:
409 """Create multiple records efficiently.
411 Args:
412 records: List of records to create
414 Returns:
415 List of record IDs
416 """
417 if not records:
418 return []
420 self._check_connection()
422 loop = asyncio.get_event_loop()
423 return await loop.run_in_executor(
424 self.executor,
425 self._create_batch_sync,
426 records
427 )
429 def _create_batch_sync(self, records: list[Record]) -> list[str]:
430 """Synchronous batch create implementation."""
431 # Use the shared batch create query builder
432 query, params, ids = self.query_builder.build_batch_create_query(records)
434 # Execute the batch insert in a transaction
435 with self._lock:
436 try:
437 self.conn.begin()
438 self.conn.execute(query, params)
439 self.conn.commit()
440 return ids
441 except Exception:
442 self.conn.rollback()
443 raise
445 async def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]:
446 """Update multiple records efficiently.
448 Args:
449 updates: List of (record_id, record) tuples
451 Returns:
452 List of success indicators
453 """
454 if not updates:
455 return []
457 self._check_connection()
459 loop = asyncio.get_event_loop()
460 return await loop.run_in_executor(
461 self.executor,
462 self._update_batch_sync,
463 updates
464 )
466 def _update_batch_sync(self, updates: list[tuple[str, Record]]) -> list[bool]:
467 """Synchronous batch update implementation."""
468 # Use the shared batch update query builder
469 query, params = self.query_builder.build_batch_update_query(updates)
471 # Execute the batch update in a transaction
472 with self._lock:
473 try:
474 self.conn.begin()
475 self.conn.execute(query, params)
476 self.conn.commit()
478 # Check which records were actually updated
479 update_ids = [record_id for record_id, _ in updates]
480 placeholders = ", ".join(["?" for _ in update_ids])
481 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})"
483 rows = self.conn.execute(check_query, update_ids).fetchall()
484 existing_ids = {row[0] for row in rows}
486 # Return results for each update
487 results = []
488 for record_id, _ in updates:
489 results.append(record_id in existing_ids)
491 return results
492 except Exception:
493 self.conn.rollback()
494 raise
496 async def delete_batch(self, ids: list[str]) -> list[bool]:
497 """Delete multiple records efficiently.
499 Args:
500 ids: List of record IDs to delete
502 Returns:
503 List of success indicators
504 """
505 if not ids:
506 return []
508 self._check_connection()
510 loop = asyncio.get_event_loop()
511 return await loop.run_in_executor(
512 self.executor,
513 self._delete_batch_sync,
514 ids
515 )
517 def _delete_batch_sync(self, ids: list[str]) -> list[bool]:
518 """Synchronous batch delete implementation."""
519 with self._lock:
520 # Check which IDs exist before deletion
521 placeholders = ", ".join(["?" for _ in ids])
522 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})"
524 rows = self.conn.execute(check_query, ids).fetchall()
525 existing_ids = {row[0] for row in rows}
527 # Use the shared batch delete query builder
528 query, params = self.query_builder.build_batch_delete_query(ids)
530 # Execute the batch delete in a transaction
531 try:
532 self.conn.begin()
533 self.conn.execute(query, params)
534 self.conn.commit()
536 # Return results based on which IDs existed
537 results = []
538 for id in ids:
539 results.append(id in existing_ids)
541 return results
542 except Exception:
543 self.conn.rollback()
544 raise
546 def _initialize(self) -> None:
547 """Initialize method - connection setup handled in connect()."""
548 pass
550 async def _count_all(self) -> int:
551 """Count all records in the database."""
552 self._check_connection()
554 loop = asyncio.get_event_loop()
555 return await loop.run_in_executor(
556 self.executor,
557 self._count_all_sync
558 )
560 def _count_all_sync(self) -> int:
561 """Synchronous count all implementation."""
562 with self._lock:
563 result = self.conn.execute(f"SELECT COUNT(*) FROM {self.table_name}").fetchone()
564 return result[0] if result else 0
566 async def stream_read(
567 self,
568 query: Query | None = None,
569 config: StreamConfig | None = None
570 ) -> AsyncIterator[Record]:
571 """Stream records from database.
573 Args:
574 query: Optional query specification
575 config: Stream configuration
577 Yields:
578 Records one at a time
579 """
580 from ..streaming import StreamConfig
582 config = config or StreamConfig()
583 query = query or Query()
585 # Use the existing stream method's logic but yield individual records
586 offset = 0
587 while True:
588 # Fetch a batch
589 query_copy = query.copy()
590 query_copy.offset(offset).limit(config.batch_size)
591 batch = await self.search(query_copy)
593 if not batch:
594 break
596 for record in batch:
597 yield record
599 offset += len(batch)
601 # If we got less than batch_size, we're done
602 if len(batch) < config.batch_size:
603 break
605 async def stream_write(
606 self,
607 records: AsyncIterator[Record],
608 config: StreamConfig | None = None
609 ) -> StreamResult:
610 """Stream records into database.
612 Args:
613 records: Async iterator of records
614 config: Stream configuration
616 Returns:
617 Stream result with statistics
618 """
619 import time
621 from ..streaming import StreamConfig, StreamResult
623 config = config or StreamConfig()
624 batch = []
625 total_written = 0
626 start_time = time.time()
628 async for record in records:
629 batch.append(record)
631 if len(batch) >= config.batch_size:
632 # Write the batch
633 await self.create_batch(batch)
634 total_written += len(batch)
635 batch = []
637 # Write any remaining records
638 if batch:
639 await self.create_batch(batch)
640 total_written += len(batch)
642 elapsed = time.time() - start_time
644 return StreamResult(
645 total_processed=total_written,
646 successful=total_written,
647 failed=0,
648 duration=elapsed,
649 total_batches=(total_written + config.batch_size - 1) // config.batch_size
650 )
653class SyncDuckDBDatabase(SyncDatabase, ConfigurableBase): # type: ignore[misc]
654 """Synchronous DuckDB database backend for analytical workloads.
656 DuckDB is an embedded columnar database optimized for analytics.
657 Provides 10-100x performance improvement over SQLite for
658 aggregations, joins, and analytical queries.
660 Features:
661 - Columnar storage for fast analytical queries
662 - Native Parquet integration for efficient data import/export
663 - Advanced analytics support (window functions, CTEs, complex aggregations)
665 Usage:
666 ```python
667 from dataknobs_data.backends.duckdb import SyncDuckDBDatabase
669 # File-based database
670 db = SyncDuckDBDatabase({"path": "/path/to/data.duckdb"})
672 # In-memory database
673 db = SyncDuckDBDatabase({"path": ":memory:"})
675 with db:
676 # Perform CRUD operations
677 db.create(record)
678 results = db.search(query)
679 ```
680 """
682 def __init__(self, config: dict[str, Any] | None = None):
683 """Initialize sync DuckDB database.
685 Args:
686 config: Configuration with the following optional keys:
687 - path: Database file path (default: ":memory:")
688 - table: Table name (default: "records")
689 - timeout: Connection timeout in seconds (default: 5.0)
690 - read_only: Open database in read-only mode (default: False)
691 """
692 super().__init__(config)
693 config = config or {}
694 self.db_path = config.get("path", ":memory:")
695 self.table_name = config.get("table", "records")
696 self.timeout = config.get("timeout", 5.0)
697 self.read_only = config.get("read_only", False)
699 # Reuse SQL infrastructure
700 self.query_builder = SQLQueryBuilder(
701 self.table_name,
702 dialect="duckdb",
703 param_style="qmark"
704 )
705 self.serializer = SQLRecordSerializer()
706 self.table_manager = SQLTableManager(
707 self.table_name,
708 dialect="duckdb"
709 )
711 self.conn: duckdb.DuckDBPyConnection | None = None
712 self._connected = False
714 @classmethod
715 def from_config(cls, config: dict) -> SyncDuckDBDatabase:
716 """Create from config dictionary."""
717 return cls(config)
719 def connect(self) -> None:
720 """Connect to the DuckDB database."""
721 if self._connected:
722 return
724 # Create directory if needed for file-based database
725 if self.db_path != ":memory:":
726 db_file = Path(self.db_path)
727 db_file.parent.mkdir(parents=True, exist_ok=True)
729 # Connect to database
730 self.conn = duckdb.connect(
731 self.db_path,
732 read_only=self.read_only
733 )
735 # Create table if it doesn't exist
736 self._ensure_table()
738 self._connected = True
739 logger.info(f"Connected to sync DuckDB database: {self.db_path}")
741 def close(self) -> None:
742 """Close the database connection."""
743 if self.conn:
744 self.conn.close()
745 self.conn = None
746 self._connected = False
747 logger.info(f"Disconnected from sync DuckDB database: {self.db_path}")
749 def _ensure_table(self) -> None:
750 """Ensure the table exists."""
751 if not self.conn:
752 raise RuntimeError("Database not connected. Call connect() first.")
754 # Skip table creation in read-only mode
755 if self.read_only:
756 return
758 create_sql = self.table_manager.get_create_table_sql()
759 self.conn.execute(create_sql)
761 def _check_connection(self) -> None:
762 """Check if database is connected."""
763 if not self._connected or not self.conn:
764 raise RuntimeError("Database not connected. Call connect() first.")
766 def create(self, record: Record) -> str:
767 """Create a new record.
769 Args:
770 record: The record to create
772 Returns:
773 The record ID
774 """
775 self._check_connection()
776 query, params = self.query_builder.build_create_query(record)
778 try:
779 self.conn.execute(query, params)
780 record_id = params[0] # ID is the first parameter
781 return record_id
782 except duckdb.ConstraintException as e:
783 raise ValueError(f"Record with ID {params[0]} already exists") from e
785 def read(self, id: str) -> Record | None:
786 """Read a record by ID.
788 Args:
789 id: The record ID
791 Returns:
792 The record if found, None otherwise
793 """
794 self._check_connection()
795 query, params = self.query_builder.build_read_query(id)
797 result = self.conn.execute(query, params).fetchone()
799 if result:
800 columns = self.conn.description
801 row_dict = {columns[i][0]: result[i] for i in range(len(columns))}
802 return SQLQueryBuilder.row_to_record(row_dict)
803 return None
805 def update(self, id: str, record: Record) -> bool:
806 """Update an existing record.
808 Args:
809 id: The record ID to update
810 record: The record data to update with
812 Returns:
813 True if the record was updated, False if no record exists
814 """
815 self._check_connection()
816 query, params = self.query_builder.build_update_query(id, record)
818 # Check if record exists
819 exists_query, exists_params = self.query_builder.build_exists_query(id)
820 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None
822 if exists:
823 self.conn.execute(query, params)
824 return True
826 logger.warning(f"Update affected 0 rows for id={id}. Record may not exist.")
827 return False
829 def delete(self, id: str) -> bool:
830 """Delete a record by ID.
832 Args:
833 id: The record ID
835 Returns:
836 True if deleted, False if not found
837 """
838 self._check_connection()
839 query, params = self.query_builder.build_delete_query(id)
841 # First check if the record exists
842 exists_query, exists_params = self.query_builder.build_exists_query(id)
843 exists = self.conn.execute(exists_query, exists_params).fetchone() is not None
845 if exists:
846 self.conn.execute(query, params)
847 return True
848 return False
850 def exists(self, id: str) -> bool:
851 """Check if a record exists.
853 Args:
854 id: The record ID
856 Returns:
857 True if exists, False otherwise
858 """
859 self._check_connection()
860 query, params = self.query_builder.build_exists_query(id)
862 result = self.conn.execute(query, params).fetchone()
863 return result is not None
865 def search(self, query: Query | ComplexQuery) -> list[Record]:
866 """Search for records matching a query.
868 Args:
869 query: The query specification
871 Returns:
872 List of matching records
873 """
874 self._check_connection()
876 # Handle ComplexQuery with native SQL support
877 if isinstance(query, ComplexQuery):
878 sql_query, params = self.query_builder.build_complex_search_query(query)
879 else:
880 sql_query, params = self.query_builder.build_search_query(query)
882 results = self.conn.execute(sql_query, params).fetchall()
883 columns = self.conn.description
885 records = []
886 for result in results:
887 row_dict = {columns[i][0]: result[i] for i in range(len(columns))}
888 record = SQLQueryBuilder.row_to_record(row_dict)
889 record.storage_id = str(row_dict['id'])
890 records.append(record)
892 # Apply field projection if specified
893 if hasattr(query, 'fields') and query.fields:
894 records = [r.project(query.fields) for r in records]
896 return records
898 def count(self, query: Query | None = None) -> int:
899 """Count records matching a query.
901 Args:
902 query: Optional query specification
904 Returns:
905 Count of matching records
906 """
907 self._check_connection()
908 sql_query, params = self.query_builder.build_count_query(query)
910 result = self.conn.execute(sql_query, params).fetchone()
911 return result[0] if result else 0
913 def create_batch(self, records: list[Record]) -> list[str]:
914 """Create multiple records efficiently.
916 Args:
917 records: List of records to create
919 Returns:
920 List of record IDs
921 """
922 if not records:
923 return []
925 self._check_connection()
926 query, params, ids = self.query_builder.build_batch_create_query(records)
928 try:
929 self.conn.begin()
930 self.conn.execute(query, params)
931 self.conn.commit()
932 return ids
933 except Exception:
934 self.conn.rollback()
935 raise
937 def update_batch(self, updates: list[tuple[str, Record]]) -> list[bool]:
938 """Update multiple records efficiently.
940 Args:
941 updates: List of (record_id, record) tuples
943 Returns:
944 List of success indicators
945 """
946 if not updates:
947 return []
949 self._check_connection()
950 query, params = self.query_builder.build_batch_update_query(updates)
952 try:
953 self.conn.begin()
954 self.conn.execute(query, params)
955 self.conn.commit()
957 # Check which records were actually updated
958 update_ids = [record_id for record_id, _ in updates]
959 placeholders = ", ".join(["?" for _ in update_ids])
960 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})"
962 rows = self.conn.execute(check_query, update_ids).fetchall()
963 existing_ids = {row[0] for row in rows}
965 results = []
966 for record_id, _ in updates:
967 results.append(record_id in existing_ids)
969 return results
970 except Exception:
971 self.conn.rollback()
972 raise
974 def delete_batch(self, ids: list[str]) -> list[bool]:
975 """Delete multiple records efficiently.
977 Args:
978 ids: List of record IDs to delete
980 Returns:
981 List of success indicators
982 """
983 if not ids:
984 return []
986 self._check_connection()
988 # Check which IDs exist before deletion
989 placeholders = ", ".join(["?" for _ in ids])
990 check_query = f"SELECT id FROM {self.table_name} WHERE id IN ({placeholders})"
992 rows = self.conn.execute(check_query, ids).fetchall()
993 existing_ids = {row[0] for row in rows}
995 query, params = self.query_builder.build_batch_delete_query(ids)
997 try:
998 self.conn.begin()
999 self.conn.execute(query, params)
1000 self.conn.commit()
1002 results = []
1003 for id in ids:
1004 results.append(id in existing_ids)
1006 return results
1007 except Exception:
1008 self.conn.rollback()
1009 raise
1011 def _initialize(self) -> None:
1012 """Initialize method - connection setup handled in connect()."""
1013 pass
1015 def _count_all(self) -> int:
1016 """Count all records in the database."""
1017 self._check_connection()
1019 result = self.conn.execute(f"SELECT COUNT(*) FROM {self.table_name}").fetchone()
1020 return result[0] if result else 0
1022 def stream_read(
1023 self,
1024 query: Query | None = None,
1025 config: StreamConfig | None = None
1026 ) -> Iterator[Record]:
1027 """Stream records from database.
1029 Args:
1030 query: Optional query specification
1031 config: Stream configuration
1033 Yields:
1034 Records one at a time
1035 """
1036 from ..streaming import StreamConfig
1038 config = config or StreamConfig()
1039 query = query or Query()
1041 offset = 0
1042 while True:
1043 query_copy = query.copy()
1044 query_copy.offset(offset).limit(config.batch_size)
1045 batch = self.search(query_copy)
1047 if not batch:
1048 break
1050 for record in batch:
1051 yield record
1053 offset += len(batch)
1055 if len(batch) < config.batch_size:
1056 break
1058 def stream_write(
1059 self,
1060 records: Iterator[Record],
1061 config: StreamConfig | None = None
1062 ) -> StreamResult:
1063 """Stream records into database.
1065 Args:
1066 records: Iterator of records
1067 config: Stream configuration
1069 Returns:
1070 Stream result with statistics
1071 """
1072 import time
1074 from ..streaming import StreamConfig, StreamResult
1076 config = config or StreamConfig()
1077 batch = []
1078 total_written = 0
1079 start_time = time.time()
1081 for record in records:
1082 batch.append(record)
1084 if len(batch) >= config.batch_size:
1085 self.create_batch(batch)
1086 total_written += len(batch)
1087 batch = []
1089 # Write any remaining records
1090 if batch:
1091 self.create_batch(batch)
1092 total_written += len(batch)
1094 elapsed = time.time() - start_time
1096 return StreamResult(
1097 total_processed=total_written,
1098 successful=total_written,
1099 failed=0,
1100 duration=elapsed,
1101 total_batches=(total_written + config.batch_size - 1) // config.batch_size
1102 )