diff --git a/poetry.lock b/poetry.lock index 8b91d91..ebf74ab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. [[package]] name = "altair" @@ -83,6 +83,34 @@ typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""} [package.extras] trio = ["trio (>=0.31.0) ; python_version < \"3.10\"", "trio (>=0.32.0) ; python_version >= \"3.10\""] +[[package]] +name = "apscheduler" +version = "3.11.2" +description = "In-process task scheduler with Cron-like capabilities" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "apscheduler-3.11.2-py3-none-any.whl", hash = "sha256:ce005177f741409db4e4dd40a7431b76feb856b9dd69d57e0da49d6715bfd26d"}, + {file = "apscheduler-3.11.2.tar.gz", hash = "sha256:2a9966b052ec805f020c8c4c3ae6e6a06e24b1bf19f2e11d91d8cca0473eef41"}, +] + +[package.dependencies] +tzlocal = ">=3.0" + +[package.extras] +doc = ["packaging", "sphinx", "sphinx-rtd-theme (>=1.3.0)"] +etcd = ["etcd3", "protobuf (<=3.21.0)"] +gevent = ["gevent"] +mongodb = ["pymongo (>=3.0)"] +redis = ["redis (>=3.0)"] +rethinkdb = ["rethinkdb (>=2.4.0)"] +sqlalchemy = ["sqlalchemy (>=1.4)"] +test = ["APScheduler[etcd,mongodb,redis,rethinkdb,sqlalchemy,tornado,zookeeper]", "PySide6 ; platform_python_implementation == \"CPython\" and python_version < \"3.14\"", "anyio (>=4.5.2)", "gevent ; python_version < \"3.14\"", "pytest", "pytest-timeout", "pytz", "twisted ; python_version < \"3.14\""] +tornado = ["tornado (>=4.3)"] +twisted = ["twisted"] +zookeeper = ["kazoo"] + [[package]] name = "attrs" version = "25.4.0" @@ -958,7 +986,7 @@ files = [ [package.dependencies] attrs = ">=22.2.0" -jsonschema-specifications = ">=2023.03.6" +jsonschema-specifications = ">=2023.3.6" referencing = ">=0.28.4" rpds-py = ">=0.25.0" @@ -2871,6 +2899,24 @@ files = [ {file = "tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7"}, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +description = "tzinfo object for the local timezone" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"}, + {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"}, +] + +[package.dependencies] +tzdata = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"] + [[package]] name = "urllib3" version = "2.6.3" @@ -3326,9 +3372,9 @@ files = [ ] [package.extras] -cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and python_version < \"3.14\"", "cffi (>=2.0.0b) ; platform_python_implementation != \"PyPy\" and python_version >= \"3.14\""] +cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and python_version < \"3.14\"", "cffi (>=2.0.0b0) ; platform_python_implementation != \"PyPy\" and python_version >= \"3.14\""] [metadata] lock-version = "2.1" python-versions = ">=3.12" -content-hash = "2e0ca881b9868d589ca4a772e3bd3a447678f63da06b13845fc3c2ba602c0f35" +content-hash = "69ac0e6b6e4f1b978213885a07824de349e92fe4b37a99586959eb1ed82106f8" diff --git a/pyproject.toml b/pyproject.toml index 5cc30df..a60af9c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "langgraph>=0.2.0", "langchain-openai>=0.2.0", "langchain-anthropic>=0.2.0", + "apscheduler (>=3.10.0)", ] diff --git a/src/fin_trade/app.py b/src/fin_trade/app.py index ff55c6e..2f5cef4 100644 --- a/src/fin_trade/app.py +++ b/src/fin_trade/app.py @@ -1,7 +1,13 @@ import streamlit as st from pathlib import Path -from fin_trade.services import PortfolioService, AgentService, SecurityService +from fin_trade.services import ( + PortfolioService, + AgentService, + SecurityService, + ExecutionLogService, + SchedulerService, +) from fin_trade.pages.overview import render_overview_page from fin_trade.pages.portfolio_detail import render_portfolio_detail_page from fin_trade.pages.system_health import render_system_health_page @@ -34,9 +40,16 @@ def get_services(): security_service = SecurityService() portfolio_service = PortfolioService(security_service=security_service) agent_service = AgentService(security_service=security_service) - return security_service, portfolio_service, agent_service + scheduler_service = SchedulerService( + portfolio_service=portfolio_service, + agent_service=agent_service, + execution_log_service=ExecutionLogService(), + security_service=security_service, + ) + scheduler_service.start() + return security_service, portfolio_service, agent_service, scheduler_service - security_service, portfolio_service, agent_service = get_services() + security_service, portfolio_service, agent_service, scheduler_service = get_services() # Initialize session state if "current_page" not in st.session_state: @@ -118,7 +131,7 @@ def get_services(): st.rerun() elif st.session_state.current_page == "dashboard": - render_dashboard_page(portfolio_service) + render_dashboard_page(portfolio_service, scheduler_service) elif st.session_state.current_page == "detail": if st.session_state.selected_portfolio: @@ -141,6 +154,7 @@ def on_navigate_to_portfolio(name: str): portfolio_service, agent_service, security_service, + scheduler_service, on_back=on_back, on_navigate_to_portfolio=on_navigate_to_portfolio, ) diff --git a/src/fin_trade/models/portfolio.py b/src/fin_trade/models/portfolio.py index eb129f3..e1ccc60 100644 --- a/src/fin_trade/models/portfolio.py +++ b/src/fin_trade/models/portfolio.py @@ -64,6 +64,8 @@ class PortfolioConfig: asset_class: AssetClass = AssetClass.STOCKS agent_mode: Literal["simple", "langgraph", "debate"] = "langgraph" debate_config: DebateConfig | None = None + scheduler_enabled: bool = False + auto_apply_trades: bool = False @dataclass diff --git a/src/fin_trade/pages/dashboard.py b/src/fin_trade/pages/dashboard.py index 8d0db73..91804df 100644 --- a/src/fin_trade/pages/dashboard.py +++ b/src/fin_trade/pages/dashboard.py @@ -3,14 +3,22 @@ import streamlit as st import pandas as pd import plotly.graph_objects as go -from datetime import datetime, timedelta +from datetime import datetime from fin_trade.cache import get_portfolio_metrics -from fin_trade.services import PortfolioService, AttributionService, SecurityService +from fin_trade.services import ( + PortfolioService, + AttributionService, + SecurityService, + SchedulerService, +) from fin_trade.services.attribution import SectorAttribution, HoldingAttribution -def render_dashboard_page(portfolio_service: PortfolioService) -> None: +def render_dashboard_page( + portfolio_service: PortfolioService, + scheduler_service: SchedulerService, +) -> None: """Render the summary dashboard page.""" st.title("Summary Dashboard") @@ -107,51 +115,57 @@ def render_dashboard_page(portfolio_service: PortfolioService) -> None: st.divider() + # Scheduler Status + st.subheader("Scheduler Status") + status = scheduler_service.get_status() + col1, col2, col3 = st.columns(3) + with col1: + st.metric("Scheduler", "Running" if status["running"] else "Stopped") + with col2: + st.metric("Enabled Schedules", status["enabled"]) + with col3: + st.metric("Active Jobs", status["jobs"]) + + st.divider() + # Upcoming Runs Schedule st.subheader("Upcoming Scheduled Runs") - - schedule_data = [] - for p in portfolio_metrics: - last_run = p["Last Run"] - freq = p["Frequency"] - - if not last_run: - next_run = datetime.now() # Run immediately if never run - status = "Pending (New)" - else: - if freq == "daily": - next_run = last_run + timedelta(days=1) - elif freq == "weekly": - next_run = last_run + timedelta(weeks=1) - elif freq == "monthly": - next_run = last_run + timedelta(days=30) - else: - next_run = last_run + timedelta(days=1) # Default - - if datetime.now() > next_run: - status = "Overdue" + + scheduled = scheduler_service.get_scheduled_portfolios() + if not scheduled: + st.info("No portfolios are enabled for scheduled execution.") + else: + schedule_data = [] + now = datetime.now() + + for item in scheduled: + if item.next_run is None: + status_label = "Pending" + elif item.next_run <= now: + status_label = "Due" else: - status = "Scheduled" - - schedule_data.append({ - "Strategy": p["Name"], - "Last Run": last_run, - "Next Run": next_run, - "Status": status - }) - - df_schedule = pd.DataFrame(schedule_data).sort_values("Next Run") - - st.dataframe( - df_schedule, - column_config={ - "Last Run": st.column_config.DatetimeColumn("Last Run", format="YYYY-MM-DD HH:mm"), - "Next Run": st.column_config.DatetimeColumn("Next Run", format="YYYY-MM-DD HH:mm"), - "Status": st.column_config.TextColumn("Status"), - }, - hide_index=True, - use_container_width=True - ) + status_label = "Scheduled" + + schedule_data.append({ + "Strategy": item.display_name, + "Frequency": item.frequency, + "Last Run": item.last_run, + "Next Run": item.next_run, + "Status": status_label, + }) + + df_schedule = pd.DataFrame(schedule_data).sort_values("Next Run") + + st.dataframe( + df_schedule, + column_config={ + "Last Run": st.column_config.DatetimeColumn("Last Run", format="YYYY-MM-DD HH:mm"), + "Next Run": st.column_config.DatetimeColumn("Next Run", format="YYYY-MM-DD HH:mm"), + "Status": st.column_config.TextColumn("Status"), + }, + hide_index=True, + use_container_width=True + ) # Performance Attribution Section st.divider() diff --git a/src/fin_trade/pages/portfolio_detail.py b/src/fin_trade/pages/portfolio_detail.py index 53d2b6c..238248d 100644 --- a/src/fin_trade/pages/portfolio_detail.py +++ b/src/fin_trade/pages/portfolio_detail.py @@ -7,7 +7,12 @@ import plotly.graph_objects as go from fin_trade.models import AssetClass, PortfolioConfig, PortfolioState, TradeRecommendation -from fin_trade.services import PortfolioService, AgentService, SecurityService +from fin_trade.services import ( + PortfolioService, + AgentService, + SecurityService, + SchedulerService, +) from fin_trade.agents.service import ( DebateAgentService, LangGraphAgentService, @@ -38,6 +43,7 @@ def render_portfolio_detail_page( portfolio_service: PortfolioService, agent_service: AgentService, security_service: SecurityService, + scheduler_service: SchedulerService, on_back: Callable | None = None, on_navigate_to_portfolio: Callable[[str], None] | None = None, ) -> None: @@ -64,7 +70,7 @@ def render_portfolio_detail_page( portfolio_name, config, state, portfolio_service, on_back, on_navigate_to_portfolio ) - _render_summary(config, state, portfolio_service, security_service) + _render_summary(config, state, portfolio_service, security_service, scheduler_service, portfolio_name) st.divider() @@ -90,6 +96,8 @@ def _render_summary( state: PortfolioState, portfolio_service: PortfolioService, security_service: SecurityService, + scheduler_service: SchedulerService, + portfolio_name: str, ) -> None: """Render the portfolio summary metrics.""" total_value = portfolio_service.calculate_value(state) @@ -132,6 +140,69 @@ def _render_summary( st.write(f"**LLM:** {config.llm_provider} / {config.llm_model}") st.write(f"**Agent Mode:** {getattr(config, 'agent_mode', 'simple')}") + with st.expander("Scheduling"): + scheduled = {item.name: item for item in scheduler_service.get_scheduled_portfolios()} + schedule_info = scheduled.get(portfolio_name) + + scheduler_enabled = st.toggle( + "Enable scheduled execution", + value=config.scheduler_enabled, + key=f"scheduler_enabled_{portfolio_name}", + help="Automatically run this portfolio on its configured cadence.", + ) + if scheduler_enabled != config.scheduler_enabled: + try: + if scheduler_enabled: + scheduler_service.enable_portfolio(portfolio_name) + else: + scheduler_service.disable_portfolio(portfolio_name) + st.rerun() + except Exception as exc: + st.error(f"Failed to update schedule: {exc}") + + auto_apply = st.toggle( + "Auto-apply trades", + value=config.auto_apply_trades, + key=f"auto_apply_{portfolio_name}", + help="Automatically apply trade recommendations after each run.", + ) + if auto_apply != config.auto_apply_trades: + try: + config.auto_apply_trades = auto_apply + portfolio_service.save_config(config, filename=portfolio_name) + st.rerun() + except Exception as exc: + st.error(f"Failed to update auto-apply setting: {exc}") + + last_run = schedule_info.last_run if schedule_info else state.last_execution + next_run = schedule_info.next_run if schedule_info else None + + col1, col2 = st.columns(2) + with col1: + st.caption( + f"Last Run: {last_run.strftime('%Y-%m-%d %H:%M') if last_run else 'Never'}" + ) + with col2: + if scheduler_enabled and next_run: + st.caption(f"Next Run: {next_run.strftime('%Y-%m-%d %H:%M')}") + elif scheduler_enabled: + st.caption("Next Run: Pending") + else: + st.caption("Next Run: Disabled") + + if st.button("Run Now", type="primary", key=f"run_now_{portfolio_name}"): + with st.spinner("Running scheduled execution..."): + try: + success = scheduler_service.run_portfolio_now(portfolio_name) + except Exception as exc: + success = False + st.error(f"Execution failed: {exc}") + if success: + st.success("Execution completed.") + else: + st.error("Execution did not complete successfully.") + st.rerun() + def _render_portfolio_actions( portfolio_name: str, diff --git a/src/fin_trade/services/__init__.py b/src/fin_trade/services/__init__.py index 5909a62..169d2a5 100644 --- a/src/fin_trade/services/__init__.py +++ b/src/fin_trade/services/__init__.py @@ -9,6 +9,7 @@ from fin_trade.services.market_data import MarketDataService from fin_trade.services.reflection import ReflectionService from fin_trade.services.comparison import ComparisonService, PortfolioMetrics +from fin_trade.services.scheduler import SchedulerService __all__ = [ "StockDataService", @@ -22,4 +23,5 @@ "ReflectionService", "ComparisonService", "PortfolioMetrics", + "SchedulerService", ] diff --git a/src/fin_trade/services/portfolio.py b/src/fin_trade/services/portfolio.py index 5e0e14e..e6e7c4f 100644 --- a/src/fin_trade/services/portfolio.py +++ b/src/fin_trade/services/portfolio.py @@ -79,6 +79,8 @@ def _load_config(self, name: str) -> PortfolioConfig: asset_class=AssetClass(data.get("asset_class", AssetClass.STOCKS.value)), agent_mode=data.get("agent_mode", "langgraph"), debate_config=debate_config, + scheduler_enabled=bool(data.get("scheduler_enabled", False)), + auto_apply_trades=bool(data.get("auto_apply_trades", False)), ) @staticmethod @@ -95,6 +97,10 @@ def _load_state(self, name: str, initial_amount: float) -> PortfolioState: if not state_path.exists(): return PortfolioState(cash=initial_amount) + # Handle empty or corrupt state files gracefully + if state_path.stat().st_size == 0: + return PortfolioState(cash=initial_amount) + with open(state_path, "r", encoding="utf-8") as f: data = json.load(f) @@ -155,6 +161,36 @@ def load_portfolio(self, name: str) -> tuple[PortfolioConfig, PortfolioState]: state = self._load_state(name, config.initial_amount) return config, state + def save_config(self, config: PortfolioConfig, filename: str | None = None) -> None: + """Save portfolio configuration to YAML.""" + target_name = filename or config.name + config_path = self.portfolios_dir / f"{target_name}.yaml" + + data = { + "name": config.name, + "strategy_prompt": config.strategy_prompt, + "initial_amount": float(config.initial_amount), + "num_initial_trades": int(config.num_initial_trades), + "trades_per_run": int(config.trades_per_run), + "run_frequency": config.run_frequency, + "llm_provider": config.llm_provider, + "llm_model": config.llm_model, + "ollama_base_url": config.ollama_base_url, + "asset_class": config.asset_class.value, + "agent_mode": config.agent_mode, + "scheduler_enabled": bool(config.scheduler_enabled), + "auto_apply_trades": bool(config.auto_apply_trades), + } + + if config.debate_config: + data["debate_config"] = { + "rounds": config.debate_config.rounds, + "include_neutral": config.debate_config.include_neutral, + } + + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, allow_unicode=True) + def save_state(self, name: str, state: PortfolioState) -> None: """Save portfolio state to JSON.""" state_path = self.state_dir / f"{name}.json" diff --git a/src/fin_trade/services/scheduler.py b/src/fin_trade/services/scheduler.py new file mode 100644 index 0000000..a49e95a --- /dev/null +++ b/src/fin_trade/services/scheduler.py @@ -0,0 +1,425 @@ +"""Background scheduler for automated portfolio executions.""" + +from __future__ import annotations + +import json +import logging +import threading +import time +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Literal + +from apscheduler.schedulers.background import BackgroundScheduler + +from fin_trade.agents.service import DebateAgentService, LangGraphAgentService +from fin_trade.models import AgentRecommendation, PortfolioConfig, PortfolioState +from fin_trade.services.agent import AgentService +from fin_trade.services.execution_log import ExecutionLogService +from fin_trade.services.portfolio import PortfolioService +from fin_trade.services.security import SecurityService + +logger = logging.getLogger(__name__) + + +@dataclass +class ScheduledPortfolio: + """Summary of a scheduled portfolio.""" + + name: str + display_name: str + frequency: Literal["daily", "weekly", "monthly"] + enabled: bool + next_run: datetime | None + last_run: datetime | None + + +class SchedulerService: + """Singleton scheduler for automated portfolio executions.""" + + _instance: "SchedulerService | None" = None + _instance_lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + with cls._instance_lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__( + self, + portfolio_service: PortfolioService | None = None, + agent_service: AgentService | None = None, + execution_log_service: ExecutionLogService | None = None, + security_service: SecurityService | None = None, + scheduler: BackgroundScheduler | None = None, + state_path: Path | None = None, + ) -> None: + if self._initialized: + return + + self.portfolio_service = portfolio_service or PortfolioService() + self.security_service = security_service or SecurityService() + self.agent_service = agent_service or AgentService(security_service=self.security_service) + self.execution_log_service = execution_log_service or ExecutionLogService() + self._scheduler = scheduler or BackgroundScheduler( + daemon=True, + job_defaults={ + "coalesce": True, + "max_instances": 1, + "misfire_grace_time": 300, + }, + ) + + self._state_path = state_path or Path("data/state/scheduler.json") + self._state_lock = threading.Lock() + self._run_lock = threading.Lock() + self._portfolio_locks: dict[str, threading.Lock] = {} + + self._enabled_portfolios: set[str] = set() + self._last_run_times: dict[str, datetime] = {} + + self._load_state() + self._initialized = True + + @classmethod + def reset_instance_for_tests(cls) -> None: + """Reset singleton instance for tests.""" + with cls._instance_lock: + if cls._instance is not None: + try: + if cls._instance._scheduler.running: + cls._instance._scheduler.shutdown(wait=False) + except Exception: + pass + cls._instance = None + + def start(self) -> None: + """Start the background scheduler.""" + self._sync_enabled_from_configs() + self._schedule_enabled_portfolios() + if not self._scheduler.running: + self._scheduler.start() + + def stop(self) -> None: + """Stop the background scheduler.""" + if self._scheduler.running: + self._scheduler.shutdown(wait=False) + + def get_status(self) -> dict: + """Get scheduler status.""" + return { + "running": self._scheduler.running, + "jobs": len(self._scheduler.get_jobs()), + "enabled": len(self._enabled_portfolios), + } + + def enable_portfolio(self, name: str) -> None: + """Enable scheduling for a portfolio.""" + config, _ = self.portfolio_service.load_portfolio(name) + config.scheduler_enabled = True + self.portfolio_service.save_config(config, filename=name) + + with self._state_lock: + self._enabled_portfolios.add(name) + self._save_state_locked() + + self._schedule_portfolio(name, config) + + def disable_portfolio(self, name: str) -> None: + """Disable scheduling for a portfolio.""" + config, _ = self.portfolio_service.load_portfolio(name) + config.scheduler_enabled = False + self.portfolio_service.save_config(config, filename=name) + + with self._state_lock: + self._enabled_portfolios.discard(name) + self._save_state_locked() + + job_id = self._job_id(name) + if self._scheduler.get_job(job_id): + self._scheduler.remove_job(job_id) + + def get_scheduled_portfolios(self) -> list[ScheduledPortfolio]: + """Get scheduled portfolios with next/last run times.""" + scheduled: list[ScheduledPortfolio] = [] + + for name in sorted(self._enabled_portfolios): + try: + config, _ = self.portfolio_service.load_portfolio(name) + except Exception as exc: + logger.error("Failed to load portfolio %s: %s", name, exc) + continue + + job = self._scheduler.get_job(self._job_id(name)) + scheduled.append( + ScheduledPortfolio( + name=name, + display_name=config.name, + frequency=config.run_frequency, + enabled=True, + next_run=job.next_run_time if job else None, + last_run=self._last_run_times.get(name), + ) + ) + + return scheduled + + def run_portfolio_now(self, name: str) -> bool: + """Run a portfolio immediately.""" + return self._execute_portfolio(name) + + def _load_state(self) -> None: + if not self._state_path.exists(): + return + + try: + with open(self._state_path, "r", encoding="utf-8") as f: + data = json.load(f) + + enabled = data.get("enabled_portfolios", []) + self._enabled_portfolios = set(enabled) + + last_run_times = {} + for name, ts in data.get("last_run_times", {}).items(): + try: + last_run_times[name] = datetime.fromisoformat(ts) + except Exception: + continue + self._last_run_times = last_run_times + except Exception as exc: + logger.error("Failed to load scheduler state: %s", exc) + + def _save_state_locked(self) -> None: + self._state_path.parent.mkdir(parents=True, exist_ok=True) + data = { + "enabled_portfolios": sorted(self._enabled_portfolios), + "last_run_times": { + name: ts.isoformat() for name, ts in self._last_run_times.items() + }, + } + with open(self._state_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + def _sync_enabled_from_configs(self) -> None: + """Sync enabled portfolios from config if no state exists.""" + if self._state_path.exists(): + return + + for name in self.portfolio_service.list_portfolios(): + try: + config, _ = self.portfolio_service.load_portfolio(name) + except Exception: + continue + if config.scheduler_enabled: + self._enabled_portfolios.add(name) + + with self._state_lock: + self._save_state_locked() + + def _schedule_enabled_portfolios(self) -> None: + for name in sorted(self._enabled_portfolios): + try: + config, _ = self.portfolio_service.load_portfolio(name) + except Exception as exc: + logger.error("Failed to load portfolio %s: %s", name, exc) + continue + self._schedule_portfolio(name, config) + + def _schedule_portfolio(self, name: str, config: PortfolioConfig) -> None: + interval = self._frequency_interval(config.run_frequency) + now = datetime.now() + last_run = self._last_run_times.get(name) + if last_run is None: + next_run = now + else: + next_run = last_run + interval + if next_run < now: + next_run = now + + self._scheduler.add_job( + self._execute_portfolio, + "interval", + id=self._job_id(name), + kwargs={"name": name}, + days=interval.days, + seconds=interval.seconds, + next_run_time=next_run, + replace_existing=True, + ) + + def _execute_portfolio(self, name: str) -> bool: + lock = self._get_portfolio_lock(name) + if not lock.acquire(blocking=False): + logger.info("Portfolio %s already running; skipping", name) + return False + + try: + return self._execute_portfolio_locked(name) + finally: + lock.release() + + def _execute_portfolio_locked(self, name: str) -> bool: + try: + config, state = self.portfolio_service.load_portfolio(name) + except Exception as exc: + logger.error("Failed to load portfolio %s: %s", name, exc) + return False + + start_time = time.time() + recommendation: AgentRecommendation | None = None + metrics = None + + try: + if config.agent_mode == "debate": + debate_agent = DebateAgentService(security_service=self.security_service) + recommendation, metrics = debate_agent.execute(config, state) + elif config.agent_mode == "langgraph": + langgraph_agent = LangGraphAgentService(security_service=self.security_service) + recommendation, metrics = langgraph_agent.execute(config, state) + else: + recommendation = self.agent_service.execute(config, state) + except Exception as exc: + logger.exception("Scheduler execution failed for %s: %s", name, exc) + if config.agent_mode == "simple": + duration_ms = int((time.time() - start_time) * 1000) + self.execution_log_service.log_execution( + portfolio_name=config.name, + agent_mode="simple", + model=config.llm_model, + duration_ms=duration_ms, + input_tokens=0, + output_tokens=0, + num_trades=0, + success=False, + error_message=str(exc), + step_details={}, + recommendations=None, + ) + return False + + run_time = datetime.now() + state.last_execution = run_time + + log_id = None + if config.agent_mode in {"debate", "langgraph"}: + log_id = self._get_latest_log_id(config.name) + else: + duration_ms = int((time.time() - start_time) * 1000) + recommendations_list = self._recommendations_to_list(recommendation) + log_id = self.execution_log_service.log_execution( + portfolio_name=config.name, + agent_mode="simple", + model=config.llm_model, + duration_ms=duration_ms, + input_tokens=0, + output_tokens=0, + num_trades=len(recommendation.trades) if recommendation else 0, + success=True, + error_message=None, + step_details={}, + recommendations=recommendations_list, + ) + + if recommendation and recommendation.trades: + if config.auto_apply_trades: + executed_indices = self._apply_recommendations( + config, + state, + recommendation, + name, + ) + if log_id is not None and executed_indices: + self.execution_log_service.mark_trades_executed( + log_id, executed_indices + ) + else: + self.portfolio_service.save_state(name, state) + else: + self.portfolio_service.save_state(name, state) + + with self._state_lock: + self._last_run_times[name] = run_time + self._save_state_locked() + + return True + + def _apply_recommendations( + self, + config: PortfolioConfig, + state: PortfolioState, + recommendation: AgentRecommendation, + name: str, + ) -> list[int]: + executed_indices: list[int] = [] + + indexed_trades = list(enumerate(recommendation.trades)) + indexed_trades.sort(key=lambda item: 0 if item[1].action == "SELL" else 1) + + for index, trade in indexed_trades: + try: + state = self.portfolio_service.execute_trade( + state, + trade.ticker, + trade.action, + trade.quantity, + trade.reasoning, + stop_loss_price=trade.stop_loss_price, + take_profit_price=trade.take_profit_price, + asset_class=config.asset_class, + ) + executed_indices.append(index) + except Exception as exc: + logger.error( + "Failed to apply trade %s for %s: %s", + trade.ticker, + name, + exc, + ) + + self.portfolio_service.save_state(name, state) + return executed_indices + + def _get_latest_log_id(self, portfolio_name: str) -> int | None: + logs = self.execution_log_service.get_logs(portfolio_name=portfolio_name, limit=1) + if not logs: + return None + return logs[0].id + + @staticmethod + def _recommendations_to_list( + recommendation: AgentRecommendation | None, + ) -> list[dict] | None: + if not recommendation: + return None + return [ + { + "ticker": trade.ticker, + "name": trade.name, + "action": trade.action, + "quantity": trade.quantity, + "reasoning": trade.reasoning, + } + for trade in recommendation.trades + ] + + def _get_portfolio_lock(self, name: str) -> threading.Lock: + with self._run_lock: + if name not in self._portfolio_locks: + self._portfolio_locks[name] = threading.Lock() + return self._portfolio_locks[name] + + @staticmethod + def _frequency_interval(frequency: str) -> timedelta: + mapping = { + "daily": timedelta(days=1), + "weekly": timedelta(weeks=1), + "monthly": timedelta(days=30), + } + return mapping.get(frequency, timedelta(weeks=1)) + + @staticmethod + def _job_id(name: str) -> str: + return f"portfolio:{name}" diff --git a/tasks.md b/tasks.md index 4c5a78d..461d461 100644 --- a/tasks.md +++ b/tasks.md @@ -6,6 +6,12 @@ Detailed implementation plans for ROADMAP.md features. ## Phase 1: Experiment Infrastructure +### 1.1 Scheduled Execution + +**Status:** In Progress + +**Goal:** Automatically run portfolio agents on their configured cadence. + ### 1.2 Portfolio Cloning & Reset **Status:** Completed diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..e49c591 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,223 @@ +"""Tests for SchedulerService.""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from apscheduler.schedulers.background import BackgroundScheduler + +from fin_trade.models import AgentRecommendation, TradeRecommendation +from fin_trade.services.portfolio import PortfolioService +from fin_trade.services.scheduler import SchedulerService + + +def _write_portfolio_config( + path: Path, + *, + name: str = "Test Portfolio", + run_frequency: str = "daily", + scheduler_enabled: bool = False, + auto_apply_trades: bool = False, + agent_mode: str = "simple", +) -> None: + content = f"""name: {name} +strategy_prompt: Test strategy +initial_amount: 10000.0 +num_initial_trades: 2 +trades_per_run: 1 +run_frequency: {run_frequency} +llm_provider: openai +llm_model: gpt-4o +agent_mode: {agent_mode} +scheduler_enabled: {str(scheduler_enabled).lower()} +auto_apply_trades: {str(auto_apply_trades).lower()} +""" + path.write_text(content) + + +@pytest.fixture +def scheduler_fixture(temp_data_dir, mock_security_service): + SchedulerService.reset_instance_for_tests() + + portfolio_service = PortfolioService( + portfolios_dir=temp_data_dir["portfolios"], + state_dir=temp_data_dir["state"], + security_service=mock_security_service, + ) + agent_service = MagicMock() + execution_log_service = MagicMock() + execution_log_service.log_execution.return_value = 1 + + scheduler = SchedulerService( + portfolio_service=portfolio_service, + agent_service=agent_service, + execution_log_service=execution_log_service, + security_service=mock_security_service, + scheduler=BackgroundScheduler(daemon=True), + state_path=temp_data_dir["state"] / "scheduler.json", + ) + + yield scheduler, portfolio_service, agent_service, execution_log_service + + scheduler.stop() + SchedulerService.reset_instance_for_tests() + + +def test_scheduler_start_stop(scheduler_fixture, temp_data_dir): + scheduler, _, _, _ = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=True) + + scheduler.start() + status = scheduler.get_status() + assert status["running"] is True + assert status["jobs"] == 1 + + scheduler.stop() + status = scheduler.get_status() + assert status["running"] is False + + +def test_enable_disable_portfolio(scheduler_fixture, temp_data_dir): + scheduler, portfolio_service, _, _ = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=False) + + scheduler.start() + + scheduler.enable_portfolio("test_portfolio") + config, _ = portfolio_service.load_portfolio("test_portfolio") + assert config.scheduler_enabled is True + assert scheduler.get_status()["enabled"] == 1 + assert scheduler._scheduler.get_job("portfolio:test_portfolio") is not None + + scheduler.disable_portfolio("test_portfolio") + config, _ = portfolio_service.load_portfolio("test_portfolio") + assert config.scheduler_enabled is False + assert scheduler.get_status()["enabled"] == 0 + assert scheduler._scheduler.get_job("portfolio:test_portfolio") is None + + +def test_schedule_respects_frequency(scheduler_fixture, temp_data_dir): + scheduler, _, _, _ = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=True, run_frequency="weekly") + + scheduler.start() + + job = scheduler._scheduler.get_job("portfolio:test_portfolio") + assert job is not None + assert job.trigger.interval == timedelta(weeks=1) + + +def test_auto_apply_executes_trades(scheduler_fixture, temp_data_dir): + scheduler, portfolio_service, agent_service, execution_log_service = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=True, auto_apply_trades=True) + + recommendation = AgentRecommendation( + trades=[ + TradeRecommendation( + ticker="AAPL", + name="Apple Inc.", + action="BUY", + quantity=1, + reasoning="Test trade", + ) + ], + overall_reasoning="Test reasoning", + ) + agent_service.execute.return_value = recommendation + + success = scheduler.run_portfolio_now("test_portfolio") + assert success is True + + _, state = portfolio_service.load_portfolio("test_portfolio") + assert len(state.holdings) == 1 + assert len(state.trades) == 1 + + execution_log_service.mark_trades_executed.assert_called_once_with(1, [0]) + + +def test_queue_mode_creates_pending(scheduler_fixture, temp_data_dir): + scheduler, portfolio_service, agent_service, execution_log_service = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=True, auto_apply_trades=False) + + recommendation = AgentRecommendation( + trades=[ + TradeRecommendation( + ticker="AAPL", + name="Apple Inc.", + action="BUY", + quantity=1, + reasoning="Test trade", + ) + ], + overall_reasoning="Test reasoning", + ) + agent_service.execute.return_value = recommendation + + success = scheduler.run_portfolio_now("test_portfolio") + assert success is True + + _, state = portfolio_service.load_portfolio("test_portfolio") + assert state.holdings == [] + assert state.trades == [] + assert state.last_execution is not None + + execution_log_service.mark_trades_executed.assert_not_called() + + +def test_error_handling(scheduler_fixture, temp_data_dir): + scheduler, _, agent_service, execution_log_service = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=True) + + agent_service.execute.side_effect = RuntimeError("boom") + + success = scheduler.run_portfolio_now("test_portfolio") + assert success is False + + execution_log_service.log_execution.assert_called_once() + args, kwargs = execution_log_service.log_execution.call_args + assert kwargs["success"] is False + + +def test_state_persistence(scheduler_fixture, temp_data_dir): + scheduler, _, agent_service, _ = scheduler_fixture + config_path = temp_data_dir["portfolios"] / "test_portfolio.yaml" + _write_portfolio_config(config_path, scheduler_enabled=True) + + agent_service.execute.side_effect = None + agent_service.execute.return_value = AgentRecommendation(trades=[], overall_reasoning="") + + scheduler.start() + result = scheduler.run_portfolio_now("test_portfolio") + assert result is True, "run_portfolio_now should succeed" + + state_path = temp_data_dir["state"] / "scheduler.json" + assert state_path.exists() + assert state_path.stat().st_size > 0, "scheduler.json should not be empty" + + data = json.loads(state_path.read_text()) + assert "test_portfolio" in data["enabled_portfolios"] + assert "test_portfolio" in data["last_run_times"] + + scheduler.stop() + SchedulerService.reset_instance_for_tests() + + reloaded = SchedulerService( + portfolio_service=scheduler.portfolio_service, + agent_service=scheduler.agent_service, + execution_log_service=scheduler.execution_log_service, + security_service=scheduler.security_service, + scheduler=BackgroundScheduler(daemon=True), + state_path=state_path, + ) + assert "test_portfolio" in reloaded._enabled_portfolios + assert "test_portfolio" in reloaded._last_run_times