Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,33 @@ class ConnectionManager:

def __init__(
self,
*,
timeout: float = 5.0,
check_interval: float = 10.0,
offline_mode: bool = False,
) -> None:
"""Initialize the connection manager.

Args:
emitter: Event emitter for broadcasting connection state
config_manager: Config to resolve for Connection Manager
timeout: Timeout in seconds for connectivity checks
check_interval: Seconds between connectivity checks
offline_mode: Daemon is in offline mode; skip connectivity checks
"""
self._timeout = timeout
self._check_interval = check_interval
self._is_connected = False
self._running = False
self._checker_thread: threading.Thread | None = None
self._offline_mode = offline_mode

emitter.emit(Emitter.IS_CONNECTED, self._is_connected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that StateManager init before ConnectionManager. This is a race condition risk. The pyee.EventEmitter is a standard pub/sub pattern - events are NOT queued. If no listener is registered when the event fires, it's lost.

We need to make sure that StateManager gets the right state at init/start, can even directly read from config, if event is lost no problem.


def start(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just have to remember to execute start during integration

"""Start the connection monitoring thread."""
if self._offline_mode:
logger.info("ConnectionManager in offline mode")
return
if self._running:
logger.warning("ConnectionManager already running")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit or lets note down or create a task in backlog
Since we're moving data daemon to asyncio, should this use asyncio.create_task() instead of spawning a thread.

Expand All @@ -64,6 +71,9 @@ def stop(self, timeout: float = 5.0) -> None:
Args:
timeout: Maximum time to wait for thread to stop
"""
if self._offline_mode:
logger.info("ConnectionManager in offline mode")
return
if not self._running:
logger.warning("ConnectionManager not running")
return
Expand Down
11 changes: 6 additions & 5 deletions neuracore/data_daemon/upload_management/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from neuracore_types import DataType, RecordingDataTraceStatus

from neuracore.data_daemon.config_manager.daemon_config import DaemonConfig
from neuracore.data_daemon.event_emitter import Emitter, emitter
from neuracore.data_daemon.models import TraceErrorCode, TraceStatus, get_content_type
from neuracore.data_daemon.upload_management.trace_manager import TraceManager
Expand All @@ -32,12 +31,14 @@ class UploadManager(TraceManager):
Uploads are triggered via READY_FOR_UPLOAD events from state manager.
"""

def __init__(self, config: DaemonConfig):
"""Initialize the upload manager."""
self._config = config
def __init__(self, num_threads: int = 4):
"""Initialize the upload manager.

Args:
num_threads: Number of concurrent upload threads
"""
# Threading
self._num_threads = self._config.num_threads or 4
self._num_threads = num_threads
self._executor = ThreadPoolExecutor(
max_workers=self._num_threads,
thread_name_prefix="uploader",
Expand Down
Loading
Loading