From 8a01fb3416acc3df84a05067650a735f8b69e26e Mon Sep 17 00:00:00 2001 From: Even Date: Mon, 9 Feb 2026 20:43:48 +0800 Subject: [PATCH 1/3] Enhance memory listing functionality with pagination and sorting support - Updated the `list` method in various vector store classes to include optional parameters for filtering, pagination (offset and limit), and sorting (order by and order direction). - Refactored the `StorageAdapter` class to streamline memory retrieval with the new parameters. --- src/powermem/storage/adapter.py | 83 ++----------------- src/powermem/storage/base.py | 12 ++- src/powermem/storage/oceanbase/oceanbase.py | 44 +++++++--- src/powermem/storage/pgvector/pgvector.py | 46 ++++++++-- .../storage/sqlite/sqlite_vector_store.py | 18 +++- 5 files changed, 102 insertions(+), 101 deletions(-) diff --git a/src/powermem/storage/adapter.py b/src/powermem/storage/adapter.py index edd1f38..cfccf43 100644 --- a/src/powermem/storage/adapter.py +++ b/src/powermem/storage/adapter.py @@ -484,14 +484,13 @@ def get_all_memories( if run_id: filters["run_id"] = run_id - # Get memories from vector store with filters (if supported) - if filters and hasattr(self.vector_store, 'list'): - # Pass filters to vector store's list method for database-level filtering - # Request more records to support offset - results = self.vector_store.list(filters=filters, limit=limit + offset) - else: - # Fallback: get all and filter in memory - results = self.vector_store.list(limit=limit + offset) + results = self.vector_store.list( + filters=filters if filters else None, + limit=limit, + offset=offset, + order_by=sort_by, + order=order + ) # OceanBase returns [memories], SQLite/PGVector return memories directly if results and isinstance(results[0], list): @@ -549,73 +548,7 @@ def get_all_memories( memories.append(memory) - # Apply sorting if specified - if sort_by: - memories = self._sort_memories(memories, sort_by, order) - - # Apply offset and limit - return memories[offset:offset + limit] - - def _sort_memories( - self, - memories: List[Dict[str, Any]], - sort_by: str, - order: str = "desc" - ) -> List[Dict[str, Any]]: - """ - Sort memories by specified field. - - Args: - memories: List of memory dictionaries - sort_by: Field to sort by. Options: "created_at", "updated_at", "id" - order: Sort order. "desc" for descending (default), "asc" for ascending - - Returns: - Sorted list of memories - """ - if not memories or not sort_by: - return memories - - reverse = (order.lower() == "desc") - - def get_sort_key(memory: Dict[str, Any]) -> Any: - """Get the sort key value from memory.""" - if sort_by == "created_at": - created_at = memory.get("created_at") - if created_at is None: - return datetime.min if reverse else datetime.max - # Handle both datetime objects and ISO format strings - if isinstance(created_at, str): - try: - from datetime import datetime as dt - return dt.fromisoformat(created_at.replace('Z', '+00:00')) - except (ValueError, AttributeError): - return datetime.min if reverse else datetime.max - return created_at if isinstance(created_at, datetime) else datetime.min - elif sort_by == "updated_at": - updated_at = memory.get("updated_at") - if updated_at is None: - return datetime.min if reverse else datetime.max - # Handle both datetime objects and ISO format strings - if isinstance(updated_at, str): - try: - from datetime import datetime as dt - return dt.fromisoformat(updated_at.replace('Z', '+00:00')) - except (ValueError, AttributeError): - return datetime.min if reverse else datetime.max - return updated_at if isinstance(updated_at, datetime) else datetime.min - elif sort_by == "id": - return memory.get("id", 0) - else: - # Unknown sort field, return original order - return None - - try: - sorted_memories = sorted(memories, key=get_sort_key, reverse=reverse) - return sorted_memories - except Exception as e: - logger.warning(f"Failed to sort memories by {sort_by}: {e}, returning original order") - return memories + return memories def clear_memories( self, diff --git a/src/powermem/storage/base.py b/src/powermem/storage/base.py index acef81c..055f1a7 100644 --- a/src/powermem/storage/base.py +++ b/src/powermem/storage/base.py @@ -68,8 +68,16 @@ def col_info(self): pass @abstractmethod - def list(self, filters=None, limit=None): - """List all memories.""" + def list(self, filters=None, limit=None, offset=None, order_by=None, order="desc"): + """List all memories with optional filtering, pagination and sorting. + + Args: + filters: Optional filters to apply + limit: Maximum number of results to return + offset: Number of results to skip + order_by: Field to sort by (e.g., "created_at", "updated_at", "id") + order: Sort order, "desc" for descending or "asc" for ascending + """ pass @abstractmethod diff --git a/src/powermem/storage/oceanbase/oceanbase.py b/src/powermem/storage/oceanbase/oceanbase.py index 931b860..646aa95 100644 --- a/src/powermem/storage/oceanbase/oceanbase.py +++ b/src/powermem/storage/oceanbase/oceanbase.py @@ -1986,7 +1986,8 @@ def col_info(self): logger.error(f"Failed to get collection info for '{self.collection_name}': {e}", exc_info=True) raise - def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None): + def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None, + offset: Optional[int] = None, order_by: Optional[str] = None, order: str = "desc"): """List all memories.""" try: table = Table(self.collection_name, self.obvector.metadata_obj, autoload_with=self.obvector.engine) @@ -1995,18 +1996,38 @@ def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None): where_clause = self._generate_where_clause(filters, table=table) # Build output column name list - output_columns = self._get_standard_column_names(include_vector_field=True) + output_columns_names = self._get_standard_column_names(include_vector_field=True) + + # Build select statement with columns + output_columns = [table.c[col_name] for col_name in output_columns_names if col_name in table.c] + stmt = select(*output_columns) + + # Apply WHERE clause + if where_clause is not None: + stmt = stmt.where(where_clause) + + # Apply ORDER BY clause for sorting + if order_by: + if order_by in table.c: + order_column = table.c[order_by] + if order.lower() == "desc": + stmt = stmt.order_by(order_column.desc()) + else: + stmt = stmt.order_by(order_column.asc()) + + # Apply OFFSET and LIMIT for pagination + if offset is not None: + stmt = stmt.offset(offset) + if limit is not None: + stmt = stmt.limit(limit) - # Get all records - results = self.obvector.get( - table_name=self.collection_name, - ids=None, - output_column_name=output_columns, - where_clause=where_clause - ) + # Execute query + with self.obvector.engine.connect() as conn: + results = conn.execute(stmt) + rows = results.fetchall() memories = [] - for row in results.fetchall(): + for row in rows: parsed = self._parse_row_to_dict(row, include_vector=True, extract_score=False) memories.append(self._create_output_data( @@ -2016,9 +2037,6 @@ def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None): parsed["metadata"] )) - if limit: - memories = memories[:limit] - logger.debug(f"Successfully listed {len(memories)} memories from collection '{self.collection_name}'") return [memories] diff --git a/src/powermem/storage/pgvector/pgvector.py b/src/powermem/storage/pgvector/pgvector.py index 2a22539..25f95c4 100644 --- a/src/powermem/storage/pgvector/pgvector.py +++ b/src/powermem/storage/pgvector/pgvector.py @@ -365,7 +365,10 @@ def col_info(self) -> dict[str, Any]: def list( self, filters: Optional[dict] = None, - limit: Optional[int] = 100 + limit: Optional[int] = 100, + offset: Optional[int] = None, + order_by: Optional[str] = None, + order: str = "desc" ) -> List[OutputData]: """ List all vectors in a collection. @@ -373,6 +376,9 @@ def list( Args: filters (Dict, optional): Filters to apply to the list. limit (int, optional): Number of vectors to return. Defaults to 100. + offset (int, optional): Number of results to skip. + order_by (str, optional): Field to sort by (e.g., "created_at", "updated_at", "id"). + order (str, optional): Sort order, "desc" for descending or "asc" for ascending. Returns: List[OutputData]: List of vectors. @@ -386,16 +392,38 @@ def list( filter_params.extend([k, str(v)]) filter_clause = "WHERE " + " AND ".join(filter_conditions) if filter_conditions else "" - - query = f""" - SELECT id, vector, payload - FROM {self.collection_name} - {filter_clause} - LIMIT %s - """ + + # Build ORDER BY clause for sorting + order_clause = "" + if order_by: + order_upper = order.upper() + if order_by in ["created_at", "updated_at"]: + # Sort by JSON field in payload + order_clause = f"ORDER BY payload->>'{order_by}' {order_upper}" + elif order_by == "id": + # Sort by id column + order_clause = f"ORDER BY id {order_upper}" + + # Build query with all clauses + query_parts = [ + f"SELECT id, vector, payload", + f"FROM {self.collection_name}", + filter_clause, + order_clause, + ] + + # Add OFFSET and LIMIT + if offset is not None: + query_parts.append("OFFSET %s") + filter_params.append(offset) + + query_parts.append("LIMIT %s") + filter_params.append(limit) + + query = "\n".join(part for part in query_parts if part) with self._get_cursor() as cur: - cur.execute(query, (*filter_params, limit)) + cur.execute(query, tuple(filter_params)) results = cur.fetchall() return [OutputData(id=r[0], score=None, payload=r[2]) for r in results] diff --git a/src/powermem/storage/sqlite/sqlite_vector_store.py b/src/powermem/storage/sqlite/sqlite_vector_store.py index 7658409..45f4051 100644 --- a/src/powermem/storage/sqlite/sqlite_vector_store.py +++ b/src/powermem/storage/sqlite/sqlite_vector_store.py @@ -240,8 +240,8 @@ def col_info(self) -> Dict[str, Any]: "db_path": self.db_path } - def list(self, filters=None, limit=None) -> List[OutputData]: - """List all memories with optional filtering.""" + def list(self, filters=None, limit=None, offset=None, order_by=None, order="desc") -> List[OutputData]: + """List all memories with optional filtering, pagination and sorting.""" query = f"SELECT id, vector, payload FROM {self.collection_name}" query_params = [] @@ -256,8 +256,22 @@ def list(self, filters=None, limit=None) -> List[OutputData]: if conditions: query += " WHERE " + " AND ".join(conditions) + # Add ORDER BY clause for sorting + if order_by: + order_upper = order.upper() + if order_by in ["created_at", "updated_at"]: + # Sort by JSON field in payload + query += f" ORDER BY json_extract(payload, '$.{order_by}') {order_upper}" + elif order_by == "id": + # Sort by id column + query += f" ORDER BY id {order_upper}" + + # Add LIMIT and OFFSET for pagination + # Note: In SQLite, LIMIT must come after ORDER BY and before OFFSET if limit: query += f" LIMIT {limit}" + if offset: + query += f" OFFSET {offset}" results = [] with self._lock: From d562d6e1ec15f9d240e7e3ea0b1ca12b3b710a92 Mon Sep 17 00:00:00 2001 From: Even Date: Wed, 11 Feb 2026 11:27:20 +0800 Subject: [PATCH 2/3] fix search bug --- src/powermem/storage/oceanbase/oceanbase.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/powermem/storage/oceanbase/oceanbase.py b/src/powermem/storage/oceanbase/oceanbase.py index 646aa95..f128805 100644 --- a/src/powermem/storage/oceanbase/oceanbase.py +++ b/src/powermem/storage/oceanbase/oceanbase.py @@ -477,7 +477,7 @@ def insert(self, logger.error(f"Failed to insert vectors into collection '{self.collection_name}': {e}", exc_info=True) raise - def _generate_where_clause(self, filters: Optional[Dict] = None, table = None) -> Optional[List]: + def _generate_where_clause(self, filters: Optional[Dict] = None, table = None): """ Generate a properly formatted where clause for OceanBase. @@ -497,7 +497,7 @@ def _generate_where_clause(self, filters: Optional[Dict] = None, table = None) - table: SQLAlchemy Table object to use for column references. If None, uses self.table. Returns: - Optional[List]: List of SQLAlchemy ColumnElement objects for where clause. + SQLAlchemy ColumnElement or None: A single SQLAlchemy expression for where clause, or None if no filters. """ # Use provided table or fall back to self.table if table is None: @@ -582,7 +582,7 @@ def process_condition(cond): # Handle complex filters with AND/OR result = process_condition(filters) - return [result] if result is not None else None + return result def _row_to_model(self, row): """ @@ -929,8 +929,8 @@ def _fulltext_search(self, query: str, limit: int = 5, filters: Optional[Dict] = # Combine FTS condition with filter conditions where_conditions = [fts_condition] - if filter_where_clause: - where_conditions.extend(filter_where_clause) + if filter_where_clause is not None: + where_conditions.append(filter_where_clause) # Build custom query to include score field try: @@ -1064,9 +1064,8 @@ def _sparse_search(self, sparse_embedding: Dict[int, float], limit: int = 5, fil stmt = select(*columns) # Add where conditions - if filter_where_clause: - for condition in filter_where_clause: - stmt = stmt.where(condition) + if filter_where_clause is not None: + stmt = stmt.where(filter_where_clause) # Order by score ASC (lower negative_inner_product means higher similarity) stmt = stmt.order_by(text('score ASC')) From f254aa2c2c903533da8be757963c82d11d94e9fc Mon Sep 17 00:00:00 2001 From: Even Date: Wed, 11 Feb 2026 14:31:17 +0800 Subject: [PATCH 3/3] Enhance PGVectorConfig for flexible database connection settings - Modified PGVectorConfig to allow for alternative environment variable aliases for database connection parameters (user, password, host, port). - Improved unit tests for memory listing to support sorting and pagination through a mock list function. --- src/powermem/storage/config/pgvector.py | 6 +- tests/unit/test_list_memories_sorting.py | 106 +++++++++++++++-------- 2 files changed, 74 insertions(+), 38 deletions(-) diff --git a/src/powermem/storage/config/pgvector.py b/src/powermem/storage/config/pgvector.py index f777c0c..bc1c8a0 100644 --- a/src/powermem/storage/config/pgvector.py +++ b/src/powermem/storage/config/pgvector.py @@ -129,8 +129,10 @@ def check_auth_and_connection(cls, values): return values if values.get("connection_string") is not None: return values - user, password = values.get("user"), values.get("password") - host, port = values.get("host"), values.get("port") + user = values.get("user") or values.get("POSTGRES_USER") + password = values.get("password") or values.get("POSTGRES_PASSWORD") + host = values.get("host") or values.get("POSTGRES_HOST") + port = values.get("port") or values.get("POSTGRES_PORT") if user is not None or password is not None: if not user or not password: raise ValueError("Both 'user' and 'password' must be provided.") diff --git a/tests/unit/test_list_memories_sorting.py b/tests/unit/test_list_memories_sorting.py index 5fddb93..e5529ea 100644 --- a/tests/unit/test_list_memories_sorting.py +++ b/tests/unit/test_list_memories_sorting.py @@ -43,6 +43,55 @@ def _create_output_data_list(self, memories_data, default_user_id="test_user"): output_data_list.append(output_data) return output_data_list + def _create_mock_list_with_sorting(self, output_data_list): + """Create a mock list function that supports sorting and pagination.""" + def list_side_effect(filters=None, limit=None, offset=None, order_by=None, order="desc"): + # Start with all data + result = output_data_list[:] + + # Apply sorting if order_by is specified + if order_by: + # Extract sort key from payload or object attributes + def get_sort_key(item): + # Special handling for 'id' field - it's on the object itself + if order_by == 'id': + value = item.id if hasattr(item, 'id') else item.get('id') + # For other fields, check payload first + elif hasattr(item, 'payload'): + value = item.payload.get(order_by) + else: + value = item.get(order_by) + + # Handle None values - put them at the end for both asc and desc + if value is None: + # Use a very large/small value to push None to the end + from datetime import datetime + if order == "desc": + return datetime.min # None goes to end (smallest) + else: + return datetime.max # None goes to end (largest) + + return value + + # Sort the results + reverse = (order == "desc") + try: + result = sorted(result, key=get_sort_key, reverse=reverse) + except Exception as e: + # If sorting fails, return unsorted + print(f"Sorting failed: {e}") + pass + + # Apply pagination (offset and limit) + if offset is not None: + result = result[offset:] + if limit is not None: + result = result[:limit] + + return result + + return list_side_effect + def test_get_all_with_sort_by_updated_at_desc(self, mock_memory): """Test get_all with sorting by updated_at in descending order.""" # Create test data with different update times @@ -71,12 +120,9 @@ def test_get_all_with_sort_by_updated_at_desc(self, mock_memory): # Mock vector_store.list to return OutputData objects # Need to handle both with filters and without filters calls output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - # Return the mock data regardless of filters (filtering happens in get_all_memories) - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) result = mock_memory.get_all( user_id="test_user", @@ -120,11 +166,9 @@ def test_get_all_with_sort_by_updated_at_asc(self, mock_memory): ] output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) result = mock_memory.get_all( user_id="test_user", @@ -168,11 +212,9 @@ def test_get_all_with_sort_by_created_at_desc(self, mock_memory): ] output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) result = mock_memory.get_all( user_id="test_user", @@ -199,11 +241,9 @@ def test_get_all_with_sort_by_id_desc(self, mock_memory): ] output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) result = mock_memory.get_all( user_id="test_user", @@ -230,11 +270,9 @@ def test_get_all_without_sorting(self, mock_memory): ] output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) result = mock_memory.get_all( user_id="test_user", @@ -276,11 +314,9 @@ def test_get_all_with_filtering_and_sorting(self, mock_memory): ] output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) result = mock_memory.get_all( user_id="test_user", @@ -316,11 +352,9 @@ def test_get_all_with_pagination_and_sorting(self, mock_memory): ] output_data_list = self._create_output_data_list(test_memories_data) - - def list_side_effect(filters=None, limit=None): - return output_data_list - - mock_memory.storage.vector_store.list = MagicMock(side_effect=list_side_effect) + mock_memory.storage.vector_store.list = MagicMock( + side_effect=self._create_mock_list_with_sorting(output_data_list) + ) # Get first page result1 = mock_memory.get_all(