From 641bb96be3f0527613df243a7108b4f7759b2da8 Mon Sep 17 00:00:00 2001 From: ixeption Date: Mon, 9 Feb 2026 23:20:07 +0100 Subject: [PATCH] feat: execution replay & notes (tasks 5+6) - Execution replay: get_execution_with_context() parses log files for full AI reasoning context - Recommendation outcomes: get_recommendation_outcomes() tracks price movement and actual trades - Notes CRUD: add/update/delete execution notes with tags - UI: execution history tab, notes interface, chart annotations in portfolio_detail.py - Tests: unit tests for replay parsing, outcomes, and notes CRUD --- src/fin_trade/pages/portfolio_detail.py | 284 +++++++++++- src/fin_trade/services/execution_log.py | 579 +++++++++++++++++++++++- tests/test_execution_log.py | 244 +++++++++- 3 files changed, 1102 insertions(+), 5 deletions(-) diff --git a/src/fin_trade/pages/portfolio_detail.py b/src/fin_trade/pages/portfolio_detail.py index 2af32ae..98059d9 100644 --- a/src/fin_trade/pages/portfolio_detail.py +++ b/src/fin_trade/pages/portfolio_detail.py @@ -8,6 +8,7 @@ from fin_trade.models import AssetClass, PortfolioConfig, PortfolioState, TradeRecommendation from fin_trade.services import PortfolioService, AgentService, SecurityService +from fin_trade.services.execution_log import ExecutionLogService from fin_trade.agents.service import ( DebateAgentService, LangGraphAgentService, @@ -68,7 +69,9 @@ def render_portfolio_detail_page( st.divider() - tab1, tab2, tab3, tab4 = st.tabs(["Holdings", "Performance", "Execute Agent", "Trade History"]) + tab1, tab2, tab3, tab4, tab5 = st.tabs( + ["Holdings", "Performance", "Execute Agent", "Trade Log", "Execution History"] + ) with tab1: _render_holdings(config, state, security_service) @@ -84,6 +87,10 @@ def render_portfolio_detail_page( with tab4: render_trade_history(state.trades, security_service, config.asset_class) + with tab5: + _render_execution_history( + portfolio_name, + ) def _render_summary( config: PortfolioConfig, @@ -397,6 +404,10 @@ def _render_performance_chart( except Exception: pass # Silently skip benchmark if unavailable + # Load notes for chart annotations + log_service = ExecutionLogService() + notes = log_service.get_notes(config.name) + # Build the interactive chart fig = _build_performance_figure( filtered_timestamps, @@ -407,6 +418,7 @@ def _render_performance_chart( initial_investment, asset_class=config.asset_class, benchmark_data=benchmark_data, + notes=_map_notes_to_points(notes, filtered_timestamps, filtered_values), ) st.plotly_chart(fig, use_container_width=True, config={"displayModeBar": True}) @@ -636,6 +648,7 @@ def _build_performance_figure( initial_amount: float, asset_class: AssetClass = AssetClass.STOCKS, benchmark_data: dict | None = None, + notes: list[dict] | None = None, ) -> go.Figure: """Build the interactive stacked area Plotly figure.""" fig = go.Figure() @@ -757,6 +770,34 @@ def _build_performance_figure( ) ) + # Note markers + if notes: + note_hover = [] + for note in notes: + tags = ", ".join(note.get("tags", [])) if note.get("tags") else "No tags" + preview = note.get("note_text", "") + if len(preview) > 140: + preview = preview[:140] + "..." + note_hover.append( + f"Note
{preview}
{tags}" + ) + fig.add_trace( + go.Scatter( + x=[note["timestamp"] for note in notes], + y=[note["value"] for note in notes], + mode="markers", + name="Notes", + marker=dict( + symbol="circle", + size=10, + color="#9C27B0", + line=dict(color="#4A148C", width=1), + ), + hovertemplate="%{customdata}", + customdata=note_hover, + ) + ) + # Initial investment line fig.add_hline( y=initial_amount, @@ -814,6 +855,247 @@ def _build_performance_figure( return fig +def _map_notes_to_points( + notes: list[dict], + timestamps: list, + values: list[float], +) -> list[dict]: + """Map notes to chart points based on closest prior timestamp.""" + if not notes or not timestamps: + return [] + + note_points = [] + ts_list = list(timestamps) + + for note in notes: + note_date = note.get("note_date") + if not note_date: + continue + + idx = None + for i in range(len(ts_list) - 1, -1, -1): + if ts_list[i].date() <= note_date: + idx = i + break + + if idx is None: + continue + + note_points.append({ + "timestamp": ts_list[idx], + "value": values[idx], + "note_text": note.get("note_text", ""), + "tags": note.get("tags", []), + "id": note.get("id"), + }) + + return note_points + + +def _render_execution_history( + portfolio_name: str, +) -> None: + """Render execution history with full context and notes.""" + import json + import pandas as pd + + st.subheader("Execution History") + + log_service = ExecutionLogService() + logs = log_service.get_logs(portfolio_name=portfolio_name, limit=50) + + if not logs: + st.info("No execution logs yet. Run the agent to generate logs.") + return + + for log in logs: + recommendations = [] + if log.recommendations_json: + recommendations = json.loads(log.recommendations_json) + + executed = set() + if log.executed_trades_json: + executed = set(json.loads(log.executed_trades_json)) + + rejected = set() + if log.rejected_trades_json: + rejected = set(json.loads(log.rejected_trades_json)) + + applied_count = len(executed) + rejected_count = len(rejected) + pending_count = len(recommendations) - applied_count - rejected_count + + outcomes = log_service.get_recommendation_outcomes(log.id) + outcome_score = None + if outcomes: + outcome_values = [o["hypothetical_pl"] for o in outcomes if o["hypothetical_pl"] is not None] + if outcome_values: + outcome_score = sum(outcome_values) + + outcome_label = "Outcome: N/A" + if outcome_score is not None: + outcome_label = f"Outcome: {'+' if outcome_score >= 0 else ''}{outcome_score:,.2f}" + + expander_title = ( + f"{log.timestamp.strftime('%Y-%m-%d %H:%M')} — {log.model} — " + f"{len(recommendations)} recs | {applied_count} applied / {rejected_count} rejected / " + f"{pending_count} pending — {outcome_label}" + ) + + with st.expander(expander_title, expanded=False): + col1, col2, col3 = st.columns(3) + with col1: + st.metric("Duration", f"{log.duration_ms}ms") + with col2: + st.metric("Total Tokens", f"{log.total_tokens:,}") + with col3: + st.metric("Agent Mode", log.agent_mode) + + if log.error_message: + st.error(log.error_message) + + # Recommendations with outcomes + st.markdown("### Recommendations & Outcomes") + if not outcomes: + st.caption("No recommendations available.") + else: + rows = [] + for outcome in outcomes: + rows.append({ + "Status": outcome["status"].capitalize(), + "Ticker": outcome["ticker"], + "Action": outcome["action"], + "Qty": outcome["recommended_quantity"], + "Rec Price": outcome["recommended_price"], + "Current/Exit": outcome["exit_price"] or outcome["current_price"], + "Hypo P/L": outcome["hypothetical_pl"], + "Actual P/L": outcome["actual_pl"], + }) + + df = pd.DataFrame(rows) + st.dataframe( + df, + column_config={ + "Rec Price": st.column_config.NumberColumn("Rec Price", format="$%.2f"), + "Current/Exit": st.column_config.NumberColumn("Current/Exit", format="$%.2f"), + "Hypo P/L": st.column_config.NumberColumn("Hypo P/L", format="$%.2f"), + "Actual P/L": st.column_config.NumberColumn("Actual P/L", format="$%.2f"), + }, + hide_index=True, + use_container_width=True, + ) + + # Full context from markdown logs + context = log_service.get_execution_with_context(log.id).get("log_context", {}) + + if context.get("analysis"): + st.markdown("### Full Agent Reasoning") + st.markdown(context["analysis"]) + elif context.get("overall_reasoning"): + st.markdown("### Full Agent Reasoning") + st.markdown(context["overall_reasoning"]) + + if context.get("research"): + with st.expander("Research", expanded=False): + st.markdown(context["research"]) + + if context.get("debate") or context.get("bull_case") or context.get("bear_case"): + with st.expander("Debate Transcript", expanded=False): + if context.get("bull_case"): + st.markdown("**Bull Case**") + st.markdown(context["bull_case"]) + if context.get("bear_case"): + st.markdown("**Bear Case**") + st.markdown(context["bear_case"]) + if context.get("neutral_analysis"): + st.markdown("**Neutral Analysis**") + st.markdown(context["neutral_analysis"]) + if context.get("debate"): + st.markdown("**Debate Rounds**") + st.markdown(context["debate"]) + if context.get("moderator_verdict"): + st.markdown("**Moderator Verdict**") + st.markdown(context["moderator_verdict"]) + + if context.get("prompt"): + with st.expander("Full Prompt", expanded=False): + st.markdown(context["prompt"]) + + # Add Note UI + st.markdown("### Add Note") + note_key = f"note_{log.id}" + note_text = st.text_area( + "Note", + key=f"{note_key}_text", + placeholder="Add your observation about this execution...", + height=120, + ) + common_tags = ["Earnings", "Fed Decision", "Market Correction", "Strategy Tweak"] + selected_common = st.multiselect( + "Quick Tags", + options=common_tags, + key=f"{note_key}_common", + ) + tags_input = st.text_input( + "Tags (comma-separated)", + key=f"{note_key}_tags", + ) + if st.button("Add Note", key=f"{note_key}_add"): + tags = [] + if tags_input: + tags.extend([t.strip() for t in tags_input.split(",") if t.strip()]) + tags.extend(selected_common) + tags = list(dict.fromkeys(tags)) + try: + log_service.add_note( + portfolio_name=portfolio_name, + note_text=note_text, + execution_id=log.id, + tags=tags, + ) + st.success("Note added.") + except Exception as e: + st.error(f"Failed to add note: {e}") + + st.divider() + _render_notes_panel(portfolio_name, log_service) + + +def _render_notes_panel( + portfolio_name: str, + log_service: ExecutionLogService, +) -> None: + """Render notes panel with filtering and search.""" + st.subheader("Notes") + + notes = log_service.get_notes(portfolio_name) + if not notes: + st.info("No notes yet. Add notes from the execution history.") + return + + all_tags = sorted({tag for note in notes for tag in note.get("tags", [])}) + tag_filter = st.selectbox( + "Filter by tag", + options=["All"] + all_tags, + index=0, + ) + search_query = st.text_input("Search notes") + + filtered = [] + for note in notes: + if tag_filter != "All" and tag_filter not in note.get("tags", []): + continue + if search_query and search_query.lower() not in note["note_text"].lower(): + continue + filtered.append(note) + + for note in filtered: + tags_label = ", ".join(note.get("tags", [])) if note.get("tags") else "No tags" + header = f"{note['note_date'].strftime('%Y-%m-%d')} — {tags_label}" + with st.expander(header, expanded=False): + st.markdown(note["note_text"]) + + def _render_agent_execution( config: PortfolioConfig, state: PortfolioState, diff --git a/src/fin_trade/services/execution_log.py b/src/fin_trade/services/execution_log.py index 9594aff..3e60d3b 100644 --- a/src/fin_trade/services/execution_log.py +++ b/src/fin_trade/services/execution_log.py @@ -2,11 +2,13 @@ import sqlite3 from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, date from pathlib import Path _project_root = Path(__file__).parent.parent.parent.parent _db_path = _project_root / "data" / "state" / "execution_logs.db" +_logs_dir = _project_root / "data" / "logs" +_state_dir = _project_root / "data" / "state" @dataclass @@ -75,6 +77,18 @@ def _ensure_db(self) -> None: conn.execute("ALTER TABLE execution_logs ADD COLUMN rejected_trades_json TEXT") except sqlite3.OperationalError: pass # Column already exists + conn.execute(""" + CREATE TABLE IF NOT EXISTS execution_notes ( + id INTEGER PRIMARY KEY, + execution_id INTEGER, + portfolio_name TEXT NOT NULL, + note_date DATE NOT NULL, + note_text TEXT NOT NULL, + tags TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (execution_id) REFERENCES execution_logs(id) + ) + """) conn.commit() def log_execution( @@ -206,6 +220,316 @@ def get_log_by_id(self, log_id: int) -> ExecutionLogEntry | None: rejected_trades_json=row[15], ) + def get_execution_with_context(self, execution_id: int) -> dict: + """Get execution record with recommendations, outcomes, and log context.""" + import json + + log = self.get_log_by_id(execution_id) + if log is None: + raise ValueError(f"Execution log not found: {execution_id}") + + recommendations = [] + if log.recommendations_json: + recommendations = json.loads(log.recommendations_json) + + executed_indices = [] + if log.executed_trades_json: + executed_indices = json.loads(log.executed_trades_json) + + rejected_indices = [] + if log.rejected_trades_json: + rejected_indices = json.loads(log.rejected_trades_json) + + pending_indices = [ + i for i in range(len(recommendations)) + if i not in executed_indices and i not in rejected_indices + ] + + portfolio_state = self._load_portfolio_state_at( + log.portfolio_name, + log.timestamp, + ) + + log_context = self._parse_log_context(log) + + return { + "log": log, + "recommendations": recommendations, + "executed_indices": executed_indices, + "rejected_indices": rejected_indices, + "pending_indices": pending_indices, + "portfolio_state": portfolio_state, + "log_context": log_context, + } + + def get_recommendation_outcomes(self, execution_id: int) -> list[dict]: + """Calculate price outcomes for recommendations in an execution.""" + import json + from fin_trade.services.stock_data import StockDataService + from fin_trade.models import Trade + + log = self.get_log_by_id(execution_id) + if log is None: + raise ValueError(f"Execution log not found: {execution_id}") + if not log.recommendations_json: + return [] + + recommendations = json.loads(log.recommendations_json) + + executed_indices = [] + if log.executed_trades_json: + executed_indices = json.loads(log.executed_trades_json) + + rejected_indices = [] + if log.rejected_trades_json: + rejected_indices = json.loads(log.rejected_trades_json) + + trades = [] + state_path = _state_dir / f"{log.portfolio_name}.json" + if state_path.exists(): + with open(state_path, "r", encoding="utf-8") as f: + state_data = json.load(f) + for t in state_data.get("trades", []): + trades.append( + Trade( + timestamp=datetime.fromisoformat(t["timestamp"]), + ticker=t.get("ticker", t.get("isin", "UNKNOWN")), + name=t.get("name", t.get("ticker", "Unknown")), + action=t["action"], + quantity=float(t["quantity"]), + price=float(t["price"]), + reasoning=t.get("reasoning", ""), + stop_loss_price=t.get("stop_loss_price"), + take_profit_price=t.get("take_profit_price"), + ) + ) + trades = sorted(trades, key=lambda t: t.timestamp) + + stock_data_service = StockDataService() + outcomes = [] + + for idx, rec in enumerate(recommendations): + ticker = rec.get("ticker") + action = rec.get("action") + rec_quantity = float(rec.get("quantity", 0)) + + status = "pending" + if idx in executed_indices: + status = "applied" + elif idx in rejected_indices: + status = "rejected" + + rec_price = self._get_price_at_time( + stock_data_service, + ticker, + log.timestamp, + ) + + current_price = None + try: + current_price = stock_data_service.get_price(ticker) + except Exception: + current_price = None + + exit_price = self._get_exit_price( + trades, + ticker, + log.timestamp, + rec_quantity, + ) + + price_for_outcome = exit_price if exit_price is not None else current_price + + hypothetical_pl = None + hypothetical_pl_pct = None + if rec_price is not None and price_for_outcome is not None and rec_quantity: + if action == "BUY": + hypothetical_pl = (price_for_outcome - rec_price) * rec_quantity + else: + hypothetical_pl = (rec_price - price_for_outcome) * rec_quantity + hypothetical_pl_pct = (hypothetical_pl / (rec_price * rec_quantity)) * 100 + + actual_execution_price = None + actual_quantity = None + actual_pl = None + actual_pl_pct = None + + if status == "applied": + actual_trade = self._find_actual_trade( + trades, + ticker, + action, + log.timestamp, + ) + if actual_trade: + actual_execution_price = actual_trade.price + actual_quantity = actual_trade.quantity + if price_for_outcome is not None and actual_quantity: + if action == "BUY": + actual_pl = (price_for_outcome - actual_execution_price) * actual_quantity + else: + actual_pl = (actual_execution_price - price_for_outcome) * actual_quantity + actual_pl_pct = ( + (actual_pl / (actual_execution_price * actual_quantity)) * 100 + ) + + outcomes.append({ + "index": idx, + "ticker": ticker, + "action": action, + "recommended_quantity": rec_quantity, + "recommended_price": rec_price, + "current_price": current_price, + "exit_price": exit_price, + "status": status, + "hypothetical_pl": hypothetical_pl, + "hypothetical_pl_pct": hypothetical_pl_pct, + "actual_execution_price": actual_execution_price, + "actual_quantity": actual_quantity, + "actual_pl": actual_pl, + "actual_pl_pct": actual_pl_pct, + }) + + return outcomes + + def add_note( + self, + portfolio_name: str, + note_text: str, + execution_id: int | None = None, + note_date: date | None = None, + tags: list[str] | None = None, + ) -> int: + """Add a note to an execution or date.""" + import json + + if not note_text or not note_text.strip(): + raise ValueError("Note text cannot be empty") + + if execution_id is not None and note_date is None: + log = self.get_log_by_id(execution_id) + if log is None: + raise ValueError(f"Execution log not found: {execution_id}") + note_date = log.timestamp.date() + + if note_date is None: + note_date = date.today() + + tags_json = json.dumps(tags) if tags else None + + with sqlite3.connect(_db_path) as conn: + cursor = conn.execute( + """ + INSERT INTO execution_notes ( + execution_id, portfolio_name, note_date, note_text, tags + ) VALUES (?, ?, ?, ?, ?) + """, + ( + execution_id, + portfolio_name, + note_date.isoformat(), + note_text.strip(), + tags_json, + ), + ) + conn.commit() + return cursor.lastrowid + + def get_notes( + self, + portfolio_name: str, + start_date: date | None = None, + end_date: date | None = None, + ) -> list[dict]: + """Get notes for a portfolio, optionally filtered by date range.""" + import json + + conditions = ["portfolio_name = ?"] + params: list = [portfolio_name] + + if start_date: + conditions.append("note_date >= ?") + params.append(start_date.isoformat()) + if end_date: + conditions.append("note_date <= ?") + params.append(end_date.isoformat()) + + where_clause = " AND ".join(conditions) + + with sqlite3.connect(_db_path) as conn: + cursor = conn.execute( + f""" + SELECT id, execution_id, portfolio_name, note_date, note_text, tags, created_at + FROM execution_notes + WHERE {where_clause} + ORDER BY note_date DESC, created_at DESC + """, + params, + ) + rows = cursor.fetchall() + + notes = [] + for row in rows: + tags = json.loads(row[5]) if row[5] else [] + notes.append({ + "id": row[0], + "execution_id": row[1], + "portfolio_name": row[2], + "note_date": date.fromisoformat(row[3]), + "note_text": row[4], + "tags": tags, + "created_at": row[6], + }) + return notes + + def update_note( + self, + note_id: int, + note_text: str | None = None, + tags: list[str] | None = None, + ) -> None: + """Update an existing note.""" + import json + + if note_text is None and tags is None: + raise ValueError("Must provide note_text or tags to update") + + updates = [] + params: list = [] + + if note_text is not None: + if not note_text.strip(): + raise ValueError("Note text cannot be empty") + updates.append("note_text = ?") + params.append(note_text.strip()) + + if tags is not None: + updates.append("tags = ?") + params.append(json.dumps(tags) if tags else None) + + params.append(note_id) + update_clause = ", ".join(updates) + + with sqlite3.connect(_db_path) as conn: + cursor = conn.execute( + f"UPDATE execution_notes SET {update_clause} WHERE id = ?", + params, + ) + conn.commit() + if cursor.rowcount == 0: + raise ValueError(f"Note not found: {note_id}") + + def delete_note(self, note_id: int) -> None: + """Delete a note.""" + with sqlite3.connect(_db_path) as conn: + cursor = conn.execute( + "DELETE FROM execution_notes WHERE id = ?", + (note_id,), + ) + conn.commit() + if cursor.rowcount == 0: + raise ValueError(f"Note not found: {note_id}") + def get_logs( self, portfolio_name: str | None = None, @@ -393,3 +717,256 @@ def get_daily_stats(self, days: int = 14) -> list[dict]: } for row in cursor.fetchall() ] + + def _load_portfolio_state_at(self, portfolio_name: str, as_of: datetime) -> dict: + """Load portfolio state at a specific timestamp by replaying trades.""" + import json + from fin_trade.services.portfolio import PortfolioService + + state_path = _state_dir / f"{portfolio_name}.json" + if not state_path.exists(): + return {"cash": None, "holdings": [], "trades": []} + + with open(state_path, "r", encoding="utf-8") as f: + data = json.load(f) + + portfolio_service = PortfolioService(state_dir=_state_dir) + try: + config, _ = portfolio_service.load_portfolio(portfolio_name) + start_cash = float(data.get("initial_investment") or config.initial_amount) + except Exception: + start_cash = float(data.get("initial_investment") or data.get("cash", 0)) + + holdings = {} + cash = start_cash + trades = [] + + trade_records = data.get("trades", []) + for t in trade_records: + trade_time = datetime.fromisoformat(t["timestamp"]) + if trade_time > as_of: + continue + + action = t["action"] + quantity = float(t["quantity"]) + price = float(t["price"]) + ticker = t.get("ticker", t.get("isin", "UNKNOWN")) + name = t.get("name", ticker) + + trade_cost = price * quantity + + if action == "BUY": + cash -= trade_cost + if ticker in holdings: + existing = holdings[ticker] + total_qty = existing["quantity"] + quantity + avg_price = ( + existing["avg_price"] * existing["quantity"] + trade_cost + ) / total_qty + holdings[ticker] = { + "ticker": ticker, + "name": name, + "quantity": total_qty, + "avg_price": avg_price, + } + else: + holdings[ticker] = { + "ticker": ticker, + "name": name, + "quantity": quantity, + "avg_price": price, + } + else: + cash += trade_cost + if ticker in holdings: + holdings[ticker]["quantity"] -= quantity + if holdings[ticker]["quantity"] <= 0: + del holdings[ticker] + + trades.append({ + "timestamp": trade_time, + "ticker": ticker, + "name": name, + "action": action, + "quantity": quantity, + "price": price, + "reasoning": t.get("reasoning", ""), + }) + + return { + "cash": cash, + "holdings": list(holdings.values()), + "trades": trades, + } + + def _find_log_file(self, log: ExecutionLogEntry) -> Path | None: + """Find the markdown log file for an execution entry.""" + if not _logs_dir.exists(): + return None + + suffix = "" + if log.agent_mode == "langgraph": + suffix = "_langgraph" + elif log.agent_mode == "debate": + suffix = "_debate" + + candidates = list(_logs_dir.glob(f"{log.portfolio_name}_*{suffix}.md")) + if not candidates: + return None + + best_match = None + best_delta = None + + for candidate in candidates: + timestamp = self._extract_log_timestamp(candidate.stem, log.portfolio_name, suffix) + if timestamp is None: + continue + delta = abs((timestamp - log.timestamp).total_seconds()) + if best_delta is None or delta < best_delta: + best_delta = delta + best_match = candidate + + return best_match + + def _extract_log_timestamp( + self, + stem: str, + portfolio_name: str, + suffix: str, + ) -> datetime | None: + """Extract timestamp from a log filename stem.""" + prefix = f"{portfolio_name}_" + if not stem.startswith(prefix): + return None + timestamp_part = stem[len(prefix):] + if suffix and timestamp_part.endswith(suffix): + timestamp_part = timestamp_part[: -len(suffix)] + timestamp_part = timestamp_part.strip("_") + try: + return datetime.strptime(timestamp_part, "%Y%m%d_%H%M%S") + except ValueError: + return None + + def _parse_log_context(self, log: ExecutionLogEntry) -> dict: + """Parse markdown log file for research, analysis, and debate sections.""" + log_file = self._find_log_file(log) + if log_file is None or not log_file.exists(): + return {"error": "Log file not found"} + + try: + content = log_file.read_text(encoding="utf-8") + except Exception as exc: + return {"error": f"Failed to read log file: {exc}"} + + sections = self._parse_markdown_sections(content) + + return { + "prompt": sections.get("Prompt") or sections.get("Prompts Sent to Agent"), + "response": sections.get("Response"), + "research": sections.get("Market Research") or sections.get("Research"), + "analysis": sections.get("Analysis"), + "bull_case": sections.get("Bull Case"), + "bear_case": sections.get("Bear Case"), + "neutral_analysis": sections.get("Neutral Analysis"), + "debate": sections.get("Debate Rounds"), + "moderator_verdict": sections.get("Moderator Verdict"), + "overall_reasoning": sections.get("Overall Reasoning"), + } + + def _parse_markdown_sections(self, content: str) -> dict: + """Parse top-level markdown sections into a dictionary.""" + sections: dict[str, list[str]] = {} + current_section = None + + for line in content.splitlines(): + if line.startswith("## "): + current_section = line[3:].strip() + sections[current_section] = [] + continue + if current_section is not None: + sections[current_section].append(line) + + return { + name: "\n".join(lines).strip() + for name, lines in sections.items() + } + + def _get_price_at_time( + self, + stock_data_service, + ticker: str, + target_time: datetime, + ) -> float | None: + """Get price closest to a specific time from history.""" + import pandas as pd + + if not ticker: + return None + + days = max(5, (datetime.now() - target_time).days + 5) + try: + history = stock_data_service.get_history(ticker, days=days) + except Exception: + return None + + if history is None or history.empty: + return None + + if not isinstance(history.index, pd.DatetimeIndex): + history.index = pd.to_datetime(history.index) + + history = history.sort_index() + history = history[history.index <= target_time] + if history.empty: + return None + return float(history["Close"].iloc[-1]) + + def _get_exit_price( + self, + trades: list, + ticker: str, + after_time: datetime, + quantity: float, + ) -> float | None: + """Get weighted exit price for a quantity if later sells occurred.""" + if not ticker or quantity <= 0: + return None + + remaining = quantity + total_proceeds = 0.0 + + for trade in trades: + if trade.ticker != ticker: + continue + if trade.timestamp <= after_time: + continue + if trade.action != "SELL": + continue + + sell_qty = min(trade.quantity, remaining) + total_proceeds += sell_qty * trade.price + remaining -= sell_qty + if remaining <= 0: + break + + if remaining > 0: + return None + return total_proceeds / quantity + + def _find_actual_trade( + self, + trades: list, + ticker: str, + action: str, + after_time: datetime, + ): + """Find the first matching trade after a timestamp.""" + for trade in trades: + if trade.timestamp <= after_time: + continue + if trade.ticker != ticker: + continue + if trade.action != action: + continue + return trade + return None diff --git a/tests/test_execution_log.py b/tests/test_execution_log.py index f5cbc02..d9a46b5 100644 --- a/tests/test_execution_log.py +++ b/tests/test_execution_log.py @@ -2,10 +2,9 @@ import json import sqlite3 -from datetime import datetime, timedelta -from unittest.mock import patch - +from datetime import datetime, timedelta, date import pytest +import pandas as pd from fin_trade.services.execution_log import ExecutionLogService, ExecutionLogEntry @@ -20,6 +19,18 @@ def temp_db(tmp_path, monkeypatch): return db_path +@pytest.fixture +def temp_dirs(tmp_path, monkeypatch): + """Create temporary logs and state directories.""" + logs_dir = tmp_path / "logs" + state_dir = tmp_path / "state" + logs_dir.mkdir(parents=True, exist_ok=True) + state_dir.mkdir(parents=True, exist_ok=True) + monkeypatch.setattr("fin_trade.services.execution_log._logs_dir", logs_dir) + monkeypatch.setattr("fin_trade.services.execution_log._state_dir", state_dir) + return logs_dir, state_dir + + class TestExecutionLogServiceInit: """Tests for ExecutionLogService initialization.""" @@ -37,6 +48,10 @@ def test_creates_database_and_table(self, temp_db): "SELECT name FROM sqlite_master WHERE type='table' AND name='execution_logs'" ) assert cursor.fetchone() is not None + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='execution_notes'" + ) + assert cursor.fetchone() is not None def test_creates_parent_directory(self, tmp_path, monkeypatch): """Test creates parent directories if needed.""" @@ -362,3 +377,226 @@ def test_handles_no_data(self, temp_db): stats = service.get_daily_stats(days=7) assert stats == [] + + +class TestExecutionContext: + """Tests for execution context and outcomes.""" + + def test_get_execution_with_context_parses_log(self, temp_db, temp_dirs): + logs_dir, state_dir = temp_dirs + service = ExecutionLogService() + + log_time = datetime(2025, 1, 2, 3, 4, 5) + log_id = service.log_execution( + portfolio_name="TestPortfolio", + agent_mode="langgraph", + model="gpt-4o", + duration_ms=1000, + input_tokens=100, + output_tokens=50, + num_trades=1, + success=True, + recommendations=[{ + "ticker": "AAPL", + "name": "Apple", + "action": "BUY", + "quantity": 1, + "reasoning": "Test", + }], + ) + + with sqlite3.connect(temp_db) as conn: + conn.execute( + "UPDATE execution_logs SET timestamp = ? WHERE id = ?", + (log_time.isoformat(), log_id), + ) + conn.commit() + + log_file = logs_dir / "TestPortfolio_20250102_030405_langgraph.md" + log_file.write_text( + """# LangGraph Agent Log - 2025-01-02T03:04:05 + +## Market Research + +Research content here. + +## Analysis + +Analysis content here. +""", + encoding="utf-8", + ) + + state_path = state_dir / "TestPortfolio.json" + state_path.write_text( + json.dumps({ + "cash": 10000, + "initial_investment": 10000, + "holdings": [], + "trades": [ + { + "timestamp": "2025-01-01T10:00:00", + "ticker": "AAPL", + "name": "Apple", + "action": "BUY", + "quantity": 1, + "price": 100, + "reasoning": "Initial buy", + } + ], + }), + encoding="utf-8", + ) + + context = service.get_execution_with_context(log_id) + assert context["recommendations"][0]["ticker"] == "AAPL" + assert context["log_context"]["research"] == "Research content here." + assert context["log_context"]["analysis"] == "Analysis content here." + assert context["portfolio_state"]["holdings"][0]["ticker"] == "AAPL" + + def test_get_recommendation_outcomes(self, temp_db, temp_dirs, monkeypatch): + _, state_dir = temp_dirs + + service = ExecutionLogService() + + log_time = datetime(2025, 1, 2, 3, 4, 5) + log_id = service.log_execution( + portfolio_name="TestPortfolio", + agent_mode="langgraph", + model="gpt-4o", + duration_ms=1000, + input_tokens=100, + output_tokens=50, + num_trades=1, + success=True, + recommendations=[{ + "ticker": "AAPL", + "name": "Apple", + "action": "BUY", + "quantity": 10, + "reasoning": "Test", + }], + ) + + with sqlite3.connect(temp_db) as conn: + conn.execute( + "UPDATE execution_logs SET timestamp = ? WHERE id = ?", + (log_time.isoformat(), log_id), + ) + conn.commit() + + service.mark_trades_executed(log_id, [0]) + + state_path = state_dir / "TestPortfolio.json" + state_path.write_text( + json.dumps({ + "cash": 9000, + "initial_investment": 10000, + "holdings": [], + "trades": [ + { + "timestamp": "2025-01-02T03:05:00", + "ticker": "AAPL", + "name": "Apple", + "action": "BUY", + "quantity": 10, + "price": 100, + "reasoning": "Executed", + }, + { + "timestamp": "2025-01-05T10:00:00", + "ticker": "AAPL", + "name": "Apple", + "action": "SELL", + "quantity": 10, + "price": 120, + "reasoning": "Exit", + }, + ], + }), + encoding="utf-8", + ) + + def fake_history(self, ticker, days=365): + dates = pd.to_datetime([ + "2025-01-01", + "2025-01-02", + ]) + return pd.DataFrame({"Close": [95, 105]}, index=dates) + + def fake_price(self, ticker): + return 130.0 + + monkeypatch.setattr( + "fin_trade.services.stock_data.StockDataService.get_history", fake_history + ) + monkeypatch.setattr( + "fin_trade.services.stock_data.StockDataService.get_price", fake_price + ) + + outcomes = service.get_recommendation_outcomes(log_id) + assert len(outcomes) == 1 + outcome = outcomes[0] + assert outcome["recommended_price"] == 105.0 + assert outcome["exit_price"] == 120.0 + assert outcome["hypothetical_pl"] == 150.0 + assert outcome["actual_pl"] == 200.0 + + +class TestExecutionNotes: + """Tests for execution note CRUD.""" + + def test_add_get_update_delete_note(self, temp_db): + service = ExecutionLogService() + + note_id = service.add_note( + portfolio_name="TestPortfolio", + note_text="Initial note", + note_date=date(2025, 1, 1), + tags=["Earnings"], + ) + + notes = service.get_notes("TestPortfolio") + assert len(notes) == 1 + assert notes[0]["id"] == note_id + assert notes[0]["tags"] == ["Earnings"] + + service.update_note(note_id, note_text="Updated note", tags=["Fed Decision"]) + updated = service.get_notes("TestPortfolio")[0] + assert updated["note_text"] == "Updated note" + assert updated["tags"] == ["Fed Decision"] + + service.delete_note(note_id) + assert service.get_notes("TestPortfolio") == [] + + def test_add_note_with_execution_id_uses_execution_date(self, temp_db): + service = ExecutionLogService() + + log_id = service.log_execution( + portfolio_name="TestPortfolio", + agent_mode="langgraph", + model="gpt-4o", + duration_ms=1000, + input_tokens=100, + output_tokens=50, + num_trades=0, + success=True, + ) + + execution_time = datetime(2025, 2, 3, 10, 0, 0) + with sqlite3.connect(temp_db) as conn: + conn.execute( + "UPDATE execution_logs SET timestamp = ? WHERE id = ?", + (execution_time.isoformat(), log_id), + ) + conn.commit() + + note_id = service.add_note( + portfolio_name="TestPortfolio", + note_text="Execution note", + execution_id=log_id, + ) + + notes = service.get_notes("TestPortfolio") + assert notes[0]["id"] == note_id + assert notes[0]["note_date"] == date(2025, 2, 3)