From fa65590bb76550f0e855fdb108e0e553a96d7a64 Mon Sep 17 00:00:00 2001 From: vringar Date: Thu, 19 Feb 2026 23:43:16 +0100 Subject: [PATCH 1/3] feat: add response writing capability to StorageController handler After processing a finalize message, the StorageController now sends an ack message back on the StreamWriter to the client that sent it. This enables bidirectional communication. --- openwpm/socket_interface.py | 50 +++++++++++++++++++++++++++ openwpm/storage/storage_controller.py | 28 ++++++++++++--- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/openwpm/socket_interface.py b/openwpm/socket_interface.py index a4f8941ca..f61acb692 100644 --- a/openwpm/socket_interface.py +++ b/openwpm/socket_interface.py @@ -159,6 +159,44 @@ def send(self, msg): raise RuntimeError("socket connection broken") totalsent = totalsent + sent + def receive(self, timeout: float = 5.0) -> Any: + """Receive a single message from the server. + + Uses the same wire format as send() (4-byte length + 1-byte + serialization type + payload). Returns the deserialized message. + + Parameters + ---------- + timeout : float + Socket timeout in seconds. Returns None if no data arrives + within the timeout. + """ + old_timeout = self.sock.gettimeout() + self.sock.settimeout(timeout) + try: + header = self._recv_exactly(5) + if header is None: + return None + msglen, serialization = struct.unpack(">Lc", header) + payload = self._recv_exactly(msglen) + if payload is None: + return None + return _parse(serialization, payload) + except socket.timeout: + return None + finally: + self.sock.settimeout(old_timeout) + + def _recv_exactly(self, n: int) -> Any: + """Receive exactly n bytes from the socket.""" + data = b"" + while len(data) < n: + chunk = self.sock.recv(n - len(data)) + if not chunk: + return None + data += chunk + return data + def close(self): self.sock.close() @@ -184,6 +222,18 @@ async def get_message_from_reader(reader: asyncio.StreamReader) -> Any: return _parse(serialization, msg) +async def send_to_writer(writer: asyncio.StreamWriter, msg: Any) -> None: + """Send a JSON-serialized message to an asyncio StreamWriter. + + Uses the same wire format as ClientSocket.send() so that + ClientSocket can receive responses using the same protocol. + """ + encoded = json.dumps(msg).encode("utf-8") + header = struct.pack(">Lc", len(encoded), b"j") + writer.write(header + encoded) + await writer.drain() + + def _parse(serialization: bytes, msg: bytes) -> Any: if serialization == b"n": return msg diff --git a/openwpm/storage/storage_controller.py b/openwpm/storage/storage_controller.py index 947ca51b3..c7164094a 100644 --- a/openwpm/storage/storage_controller.py +++ b/openwpm/storage/storage_controller.py @@ -15,7 +15,7 @@ from openwpm.utilities.multiprocess_utils import Process from ..config import BrowserParamsInternal, ManagerParamsInternal -from ..socket_interface import ClientSocket, get_message_from_reader +from ..socket_interface import ClientSocket, get_message_from_reader, send_to_writer from ..types import BrowserId, VisitId from .storage_providers import ( StructuredStorageProvider, @@ -101,7 +101,7 @@ async def _handler( await writer.wait_closed() async def handler( - self, reader: asyncio.StreamReader, _: asyncio.StreamWriter + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: """Created for every new connection to the Server""" client_name = await get_message_from_reader(reader) @@ -109,7 +109,7 @@ async def handler( while True: try: record: Tuple[str, Any] = await get_message_from_reader(reader) - except IncompleteReadError: + except (IncompleteReadError, OSError): self.logger.info( f"Terminating handler for {client_name}, because the underlying socket closed" ) @@ -153,7 +153,7 @@ async def handler( visit_id = VisitId(data["visit_id"]) if record_type == RECORD_TYPE_META: - await self._handle_meta(visit_id, data) + await self._handle_meta(visit_id, data, writer) continue table_name = TableName(record_type) @@ -174,7 +174,12 @@ async def store_record( ) ) - async def _handle_meta(self, visit_id: VisitId, data: Dict[str, Any]) -> None: + async def _handle_meta( + self, + visit_id: VisitId, + data: Dict[str, Any], + writer: asyncio.StreamWriter, + ) -> None: """ Messages for the table RECORD_TYPE_SPECIAL are meta information communicated to the storage controller @@ -192,6 +197,19 @@ async def _handle_meta(self, visit_id: VisitId, data: Dict[str, Any]) -> None: success: bool = data["success"] completion_token = await self.finalize_visit_id(visit_id, success) self.finalize_tasks.append((visit_id, completion_token, success)) + # Send ack back only if the client requested it. + # Writing to a closed connection poisons the asyncio transport, + # preventing any further reads on the same connection. + if data.get("want_ack"): + try: + await send_to_writer( + writer, + {"action": "finalize_ack", "visit_id": visit_id}, + ) + except Exception: + self.logger.debug( + "Failed to send finalize ack for visit_id %d", visit_id + ) else: raise ValueError("Unexpected action: %s", action) From ffc4a40e87f39f7bb476c790e0ef91772e72cdd1 Mon Sep 17 00:00:00 2001 From: vringar Date: Thu, 19 Feb 2026 23:44:21 +0100 Subject: [PATCH 2/3] feat: add finalize_visit_id_with_ack to DataSocket DataSocket.finalize_visit_id_with_ack() sends a finalize message and waits for an acknowledgment from the StorageController. This replaces the fixed sleep in FinalizeCommand with event-driven confirmation that data has been processed. --- openwpm/storage/storage_controller.py | 34 +++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/openwpm/storage/storage_controller.py b/openwpm/storage/storage_controller.py index c7164094a..9045021e8 100644 --- a/openwpm/storage/storage_controller.py +++ b/openwpm/storage/storage_controller.py @@ -431,6 +431,40 @@ def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None: ) ) + def finalize_visit_id_with_ack( + self, visit_id: VisitId, success: bool, timeout: float = 10.0 + ) -> bool: + """Send finalize and wait for acknowledgment from StorageController. + + Returns True if ack was received, False on timeout. + Falls back gracefully - the finalize is still sent even if + the ack is not received. + """ + self.socket.send( + ( + RECORD_TYPE_META, + { + "action": ACTION_TYPE_FINALIZE, + "visit_id": visit_id, + "success": success, + "want_ack": True, + }, + ) + ) + ack = self.socket.receive(timeout=timeout) + if ( + ack is not None + and isinstance(ack, dict) + and ack.get("action") == "finalize_ack" + ): + return True + self.logger.debug( + "Did not receive finalize ack for visit_id %d (got: %r)", + visit_id, + ack, + ) + return False + def close(self) -> None: self.socket.close() From 2bc92b9a8a5f22e6f4e552be9459f8bcab3a9dcc Mon Sep 17 00:00:00 2001 From: vringar Date: Thu, 19 Feb 2026 23:45:50 +0100 Subject: [PATCH 3/3] feat: use finalize_visit_id_with_ack in TaskManager TaskManager.finalize_visit_id now waits for a StorageController ack, confirming data has been processed before returning. Falls back gracefully on timeout. --- openwpm/task_manager.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/openwpm/task_manager.py b/openwpm/task_manager.py index 9eb5fcced..825680c5d 100644 --- a/openwpm/task_manager.py +++ b/openwpm/task_manager.py @@ -339,8 +339,12 @@ def store_record( self.sock.store_record(table, visit_id, data) def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None: - """Signal that all data for a visit_id has been sent.""" - self.sock.finalize_visit_id(visit_id, success) + """Signal that all data for a visit_id has been sent. + + Waits for acknowledgment from StorageController to confirm + the data has been processed. Falls back gracefully on timeout. + """ + self.sock.finalize_visit_id_with_ack(visit_id, success) def _check_failure_status(self) -> None: """Check the status of command failures. Raise exceptions as necessary