Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion openwpm/browser_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ def execute_command_sequence(
)
# Sleep after executing CommandSequence to provide extra time for
# internal buffers to drain. Stopgap in support of #135
time.sleep(2)
drain_sleep = 0.1 if self.manager_params.testing else 2
time.sleep(drain_sleep)

if context.closing:
return
Expand Down
3 changes: 2 additions & 1 deletion openwpm/commands/browser_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ def execute(
tab_restart_browser(webdriver)
# This doesn't immediately stop data saving from the current
# visit so we sleep briefly before unsetting the visit_id.
time.sleep(self.sleep)
sleep = 0.1 if manager_params.testing else self.sleep
time.sleep(sleep)
msg = {"action": "Finalize", "visit_id": self.visit_id}
extension_socket.send(msg)

Expand Down
2 changes: 1 addition & 1 deletion openwpm/storage/storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async def update_completion_queue(self) -> None:
else:
new_finalize_tasks.append((visit_id, token, success))
self.finalize_tasks = new_finalize_tasks
await asyncio.sleep(5)
await asyncio.sleep(0.5)

async def _run(self) -> None:
await self.structured_storage.init()
Expand Down
24 changes: 20 additions & 4 deletions openwpm/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,18 +401,34 @@ def _start_thread(
return thread

def _mark_command_sequences_complete(self) -> None:
"""Polls the storage controller for saved records
and calls their callbacks
"""Waits for completed visits from the storage controller
and invokes their callbacks.

Uses blocking queue.get() with a 1s timeout instead of polling
with sleep(1), so callbacks fire as soon as data is available.
"""
while True:
if self.closing and not self.unsaved_command_sequences:
# we're shutting down and have no unprocessed callbacks
break

# First drain any already-available completions
visit_id_list = self.storage_controller_handle.get_new_completed_visits()

if not visit_id_list:
time.sleep(1)
continue
# Block waiting for the next completion, with timeout as fallback
try:
item = self.storage_controller_handle.completion_queue.get(
block=True, timeout=1
)
visit_id_list = [item]
# Drain any additional completions that arrived
visit_id_list.extend(
self.storage_controller_handle.get_new_completed_visits()
)
except Exception:
# queue.Empty or other exceptions - just loop back
continue

for visit_id, successful in visit_id_list:
self.logger.debug("Invoking callback of visit_id %d", visit_id)
Expand Down
Loading