Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
308 changes: 308 additions & 0 deletions langbuilder/src/backend/base/langflow/api/v1/usage/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
"""Usage & Cost Tracking API endpoints.

Implements four endpoints:
GET /api/v1/usage/ — aggregated usage summary
GET /api/v1/usage/{flow_id}/runs — per-run detail for a flow
POST /api/v1/usage/settings/langwatch-key — save/validate LangWatch key (admin)
GET /api/v1/usage/settings/langwatch-key/status — key status
"""
from __future__ import annotations

from typing import TYPE_CHECKING, Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import select

from langflow.api.utils import CurrentActiveUser, DbSession
from langflow.services.auth.utils import get_current_active_superuser

if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession

from langflow.services.database.models.flow.model import Flow
from langflow.services.database.models.user.model import User
from langflow.services.langwatch.exceptions import (
LangWatchConnectionError,
LangWatchError,
LangWatchInsufficientCreditsError,
LangWatchInvalidKeyError,
LangWatchKeyNotConfiguredError,
LangWatchTimeoutError,
LangWatchUnavailableError,
)
from langflow.services.langwatch.schemas import (
FlowRunsQueryParams,
FlowRunsResponse,
KeyStatusResponse,
SaveKeyResponse,
SaveLangWatchKeyRequest,
UsageQueryParams,
UsageResponse,
)
from langflow.services.langwatch.service import LangWatchService, get_langwatch_service

router = APIRouter(prefix="/usage", tags=["Usage & Cost Tracking"])


CurrentSuperUser = Annotated[User, Depends(get_current_active_superuser)]
LangWatchDep = Annotated[LangWatchService, Depends(get_langwatch_service)]


# ── Helpers ───────────────────────────────────────────────────────────────────


async def _get_flow_ids_for_user(
db: AsyncSession,
user_id: UUID | None,
) -> set[UUID]:
"""Return the set of flow IDs owned by user_id, or all flow IDs if user_id is None."""
if user_id is not None:
result = await db.execute(select(Flow.id).where(Flow.user_id == user_id))
else:
result = await db.execute(select(Flow.id))
return {row[0] for row in result.fetchall()}


async def _get_stored_key_or_raise(langwatch: LangWatchService) -> str:
"""Retrieve stored LangWatch API key or raise 503 KEY_NOT_CONFIGURED."""
api_key = await langwatch.get_stored_key()
if not api_key:
raise HTTPException(
status_code=503,
detail={
"code": "KEY_NOT_CONFIGURED",
"message": "LangWatch API key not configured. Admin setup required.",
"retryable": False,
},
)
return api_key


def _raise_langwatch_http_error(exc: Exception) -> None:
"""Map LangWatch service exceptions to structured HTTP errors."""
if isinstance(exc, LangWatchKeyNotConfiguredError):
raise HTTPException(
status_code=503,
detail={
"code": "KEY_NOT_CONFIGURED",
"message": "LangWatch API key not configured. Admin setup required.",
"retryable": False,
},
)
if isinstance(exc, LangWatchTimeoutError):
raise HTTPException(
status_code=503,
detail={
"code": "LANGWATCH_TIMEOUT",
"message": "LangWatch did not respond within the allowed time. Please try again.",
"retryable": True,
},
)
if isinstance(exc, (LangWatchUnavailableError, LangWatchConnectionError)):
raise HTTPException(
status_code=503,
detail={
"code": "LANGWATCH_UNAVAILABLE",
"message": "LangWatch is temporarily unavailable. Please try again.",
"retryable": True,
},
)
if isinstance(exc, LangWatchInvalidKeyError):
raise HTTPException(
status_code=422,
detail={
"code": "INVALID_KEY",
"message": "Invalid API key. Please check your LangWatch account settings and try again.",
},
)
if isinstance(exc, LangWatchInsufficientCreditsError):
raise HTTPException(
status_code=422,
detail={
"code": "INSUFFICIENT_CREDITS",
"message": "Your LangWatch account has insufficient credits. Please upgrade your plan at langwatch.ai.",
},
)
raise exc


def _empty_summary(params: UsageQueryParams) -> UsageResponse:
"""Return an empty UsageResponse for the given query params."""
from langflow.services.langwatch.schemas import DateRange, UsageSummary

return UsageResponse(
summary=UsageSummary(
total_cost_usd=0.0,
total_invocations=0,
avg_cost_per_invocation_usd=0.0,
active_flow_count=0,
date_range=DateRange(from_=params.from_date, to=params.to_date),
),
flows=[],
)


# ── Endpoint 1: GET /usage/ ───────────────────────────────────────────────────


@router.get("/", response_model=UsageResponse)
async def get_usage_summary(
current_user: CurrentActiveUser,
db: DbSession,
langwatch: LangWatchDep,
from_date: Annotated[str | None, Query(description="ISO 8601 start date (YYYY-MM-DD)")] = None,
to_date: Annotated[str | None, Query(description="ISO 8601 end date (YYYY-MM-DD)")] = None,
user_id: Annotated[str | None, Query(description="Admin only: filter by user UUID")] = None,
sub_view: Annotated[str, Query(description="flows | mcp")] = "flows",
) -> UsageResponse:
"""Return aggregated cost and invocation data.

Non-admin users receive only their own flows (user_id param silently ignored).
Admins can filter by user_id or retrieve all flows.
"""
params = UsageQueryParams(
from_date=from_date,
to_date=to_date,
user_id=user_id,
sub_view=sub_view,
)

# ── Ownership Filter Logic ────────────────────────────────────────────────
# Non-admins: always own flows only (params.user_id silently ignored)
# Admin with user_id: filter to that user's flows
# Admin without user_id: all flows
if current_user.is_superuser and params.user_id:
effective_user_id: UUID | None = params.user_id
elif current_user.is_superuser:
effective_user_id = None # Admin sees all
else:
effective_user_id = current_user.id # Non-admin: own flows only

allowed_flow_ids = await _get_flow_ids_for_user(db, effective_user_id)

api_key = await _get_stored_key_or_raise(langwatch)
org_id = "default" # Single-org deployment — cache shared across users of same org

try:
return await langwatch.get_usage_summary(
params, allowed_flow_ids, api_key, org_id,
is_admin=current_user.is_superuser,
)
except LangWatchError as exc:
_raise_langwatch_http_error(exc)
return _empty_summary(params) # pragma: no cover — unreachable, satisfies type checker


# ── Endpoint 2: GET /usage/{flow_id}/runs ────────────────────────────────────


@router.get("/{flow_id}/runs", response_model=FlowRunsResponse)
async def get_flow_runs(
flow_id: UUID,
current_user: CurrentActiveUser,
db: DbSession,
langwatch: LangWatchDep,
from_date: Annotated[str | None, Query(description="ISO 8601 start date")] = None,
to_date: Annotated[str | None, Query(description="ISO 8601 end date")] = None,
limit: Annotated[int, Query(ge=1, le=50, description="Max number of runs to return")] = 10,
) -> FlowRunsResponse:
"""Return per-run detail for a specific flow.

Non-admins can only access flows they own (returns 403 otherwise).
"""
query = FlowRunsQueryParams(from_date=from_date, to_date=to_date, limit=limit)

# Ownership check — look up flow in DB
result = await db.execute(select(Flow.id, Flow.name, Flow.user_id).where(Flow.id == flow_id))
row = result.fetchone()

if row is None:
raise HTTPException(
status_code=404,
detail={
"code": "FLOW_NOT_FOUND",
"message": "No usage data found for this flow in the selected period.",
},
)

flow_name: str = row[1]
flow_owner_id: UUID = row[2]

# Non-admin accessing another user's flow → 403
if not current_user.is_superuser and flow_owner_id != current_user.id:
raise HTTPException(
status_code=403,
detail={
"code": "FORBIDDEN",
"message": "You do not have permission to view this flow's usage data.",
},
)

api_key = await _get_stored_key_or_raise(langwatch)

try:
return await langwatch.fetch_flow_runs(
flow_id=flow_id,
flow_name=flow_name,
query=query,
api_key=api_key,
)
except LangWatchError as exc:
_raise_langwatch_http_error(exc)
# pragma: no cover — unreachable
return FlowRunsResponse(flow_id=flow_id, flow_name=flow_name, runs=[], total_runs_in_period=0)


# ── Endpoint 3: POST /usage/settings/langwatch-key ───────────────────────────


@router.post("/settings/langwatch-key", response_model=SaveKeyResponse)
async def save_langwatch_key(
body: SaveLangWatchKeyRequest,
current_user: CurrentSuperUser,
langwatch: LangWatchDep,
) -> SaveKeyResponse:
"""Validate the provided LangWatch API key and store it.

Admin only. Returns 403 if the requesting user is not a superuser.
"""
api_key = body.api_key.strip()

# Validate key against LangWatch before saving
try:
is_valid = await langwatch.validate_key(api_key)
except LangWatchConnectionError as exc:
_raise_langwatch_http_error(exc)
return SaveKeyResponse(success=False, key_preview="", message="") # pragma: no cover

if not is_valid:
raise HTTPException(
status_code=422,
detail={
"code": "INVALID_KEY",
"message": "Invalid API key. Please check your LangWatch account settings and try again.",
},
)

await langwatch.save_key(api_key, current_user.id)

_preview_len = 3
preview = f"****{api_key[-_preview_len:]}" if len(api_key) > _preview_len else "****"
return SaveKeyResponse(
success=True,
key_preview=preview,
message="LangWatch API key validated and saved successfully.",
)


# ── Endpoint 4: GET /usage/settings/langwatch-key/status ─────────────────────


@router.get("/settings/langwatch-key/status", response_model=KeyStatusResponse)
async def get_langwatch_key_status(
_current_user: CurrentSuperUser,
langwatch: LangWatchDep,
) -> KeyStatusResponse:
"""Return whether a LangWatch API key is configured. Admin only."""
return await langwatch.get_key_status()
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime, timezone
from uuid import UUID, uuid4

from sqlmodel import Field, SQLModel


class GlobalSettings(SQLModel, table=True): # type: ignore[call-arg]
"""Org-level (deployment-wide) key/value configuration store.

Used to store system-level settings such as API keys that are scoped
to the entire deployment rather than per-user. The LangWatch API key
is the first entry. is_encrypted=True indicates Fernet-encrypted values.
"""

__tablename__ = "global_settings"

id: UUID = Field(default_factory=uuid4, primary_key=True)
key: str = Field(index=True, unique=True, max_length=100)
value: str = Field() # Fernet-encrypted for sensitive values
is_encrypted: bool = Field(default=False)
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n langbuilder/src/backend/base/langflow/services/database/models/global_settings.py

Repository: CloudGeometry/langbuilder

Length of output: 1305


🏁 Script executed:

# Find all SQLModel/Pydantic models with updated_at fields to see existing patterns
rg "updated_at.*Field" langbuilder/src/backend/base/langflow/services/database/models/ -A 1

Repository: CloudGeometry/langbuilder

Length of output: 2287


🏁 Script executed:

# Check for imports in the file to understand dependencies
head -30 langbuilder/src/backend/base/langflow/services/database/models/global_settings.py

Repository: CloudGeometry/langbuilder

Length of output: 1116


🏁 Script executed:

# Search for any onupdate patterns in the codebase
rg "onupdate" langbuilder/src/backend/base/langflow/services/database/models/ -A 2 -B 2

Repository: CloudGeometry/langbuilder

Length of output: 51


🏁 Script executed:

# Look for sa_column_kwargs usage patterns
rg "sa_column_kwargs" langbuilder/src/backend/ -A 2 -B 2

Repository: CloudGeometry/langbuilder

Length of output: 51


🏁 Script executed:

# Check pyproject.toml or requirements for SQLModel version
fd -e "pyproject.toml" -o -e "requirements*.txt" langbuilder/src/backend | head -5 | xargs rg "sqlmodel|pydantic" -i

Repository: CloudGeometry/langbuilder

Length of output: 50517


🏁 Script executed:

grep -A 5 -B 5 "sqlmodel" langbuilder/pyproject.toml

Repository: CloudGeometry/langbuilder

Length of output: 51


🏁 Script executed:

# Check if there's any service code that manually updates updated_at fields
rg "updated_at\s*=" langbuilder/src/backend/base/langflow/services/database/ -t py --context 3

Repository: CloudGeometry/langbuilder

Length of output: 822


🏁 Script executed:

# Look for any update mechanisms in the services layer
rg "update.*global_settings" langbuilder/src/backend/ -i -t py --context 2

Repository: CloudGeometry/langbuilder

Length of output: 51


🏁 Script executed:

# Look for the broader context of how updates are handled
rg "flag_modified" langbuilder/src/backend/base/langflow/services/database/ -t py --context 2

Repository: CloudGeometry/langbuilder

Length of output: 1145


🏁 Script executed:

# Check for any Column definitions with onupdate
rg "Column|onupdate" langbuilder/src/backend/base/langflow/services/database/models/ -t py --context 1

Repository: CloudGeometry/langbuilder

Length of output: 15478


🏁 Script executed:

# Search for how GlobalSettings is updated (if at all)
rg "GlobalSettings" langbuilder/src/backend/ -t py --context 3 | head -80

Repository: CloudGeometry/langbuilder

Length of output: 7887


🏁 Script executed:

# Look for pyproject.toml with broader search
find langbuilder -name "pyproject.toml" -exec grep -l "sqlmodel" {} \;

Repository: CloudGeometry/langbuilder

Length of output: 112


🏁 Script executed:

cat langbuilder/src/backend/base/pyproject.toml | grep -A 10 -B 10 "sqlmodel\|sqlalchemy"

Repository: CloudGeometry/langbuilder

Length of output: 2005


🏁 Script executed:

# Check the langwatch service to see how GlobalSettings updates are handled
grep -A 20 "updated_by = admin_user_id" langbuilder/src/backend/base/langflow/services/langwatch/service.py

Repository: CloudGeometry/langbuilder

Length of output: 817


🏁 Script executed:

# Look for any trigger or event listener patterns for updated_at
rg "event.listen|@event|onupdate" langbuilder/src/backend/base/langflow/services/database/ -t py

Repository: CloudGeometry/langbuilder

Length of output: 334


🏁 Script executed:

# Check if there's any CRUD operations or update service for GlobalSettings
find langbuilder/src/backend/base/langflow/services -name "*global_settings*" -o -name "*crud*" | head -20

Repository: CloudGeometry/langbuilder

Length of output: 705


🏁 Script executed:

# Look at the langwatch service more thoroughly to see full update pattern
sed -n '80,150p' langbuilder/src/backend/base/langflow/services/langwatch/service.py

Repository: CloudGeometry/langbuilder

Length of output: 2636


🏁 Script executed:

# Verify if sa_column_kwargs is documented in SQLModel
# and check if onupdate works with sa_column or sa_column_kwargs
web_search SQLModel sa_column_kwargs onupdate datetime auto-update

Repository: CloudGeometry/langbuilder

Length of output: 119


🌐 Web query:

SQLModel sa_column_kwargs onupdate datetime auto-update SQLAlchemy

💡 Result:

To implement an auto-updating datetime field (e.g., updated_at) in SQLModel using sa_column_kwargs with onupdate, use the Field constructor to pass SQLAlchemy Column keyword arguments directly. This allows access to SQLAlchemy's onupdate feature without creating a full Column object via sa_column, avoiding issues like column sharing in inheritance. Example for an updated_at field that auto-updates on record changes: from datetime import datetime, timezone from typing import Optional from sqlalchemy import Column, DateTime from sqlmodel import Field, SQLModel class Item(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) updated_at: Optional[datetime] = Field( default_factory=lambda: datetime.now(timezone.utc), sa_type=DateTime(timezone=True), sa_column_kwargs={"onupdate": lambda: datetime.now(timezone.utc)} ) For created_at (set only on insert), use default_factory without onupdate: created_at: Optional[datetime] = Field( default_factory=lambda: datetime.now(timezone.utc), sa_type=DateTime(timezone=True) ) This approach works well in base classes for inheritance across multiple tables, as shown in GitHub discussions solving Column assignment errors. Prefer server_onupdate=text("CURRENT_TIMESTAMP") or func.now for database-side updates to avoid Python-evaluated timestamps differing from DB transaction time: from sqlalchemy import text, func sa_column_kwargs={"onupdate": func.now} Note: Official SQLModel docs do not explicitly document sa_column_kwargs (focus on sa_column/sa_type), but it is used in SQLModel source/tests and community examples for advanced SQLAlchemy integration. For full control, use sa_column=Column(DateTime(timezone=True), onupdate=func.now).

Citations:


🏁 Script executed:

# Check the user model's CRUD to see how they handle updated_at updates
cat langbuilder/src/backend/base/langflow/services/database/models/user/crud.py | grep -A 10 "updated_at"

Repository: CloudGeometry/langbuilder

Length of output: 387


🏁 Script executed:

# Look for any similar patterns in the codebase where onupdate or server-side updates are used
rg "server_default|func.now()" langbuilder/src/backend/base/langflow/services/database/models/ -B 2 -A 2

Repository: CloudGeometry/langbuilder

Length of output: 1867


updated_at won't auto-update when an existing GlobalSettings record is modified.

Both timestamps use default_factory only, so they're set once at insert time. When a setting is updated (e.g., during langwatch/service.py:save_key()), the updated_at field remains stale unless every caller explicitly sets it. Add an onupdate hook to the model instead of relying on service code to remember this.

Proposed fix
-    updated_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
+    updated_at: datetime = Field(
+        default_factory=lambda: datetime.now(tz=timezone.utc),
+        sa_column_kwargs={"onupdate": lambda: datetime.now(tz=timezone.utc)},
+    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@langbuilder/src/backend/base/langflow/services/database/models/global_settings.py`
around lines 21 - 22, The GlobalSettings model's updated_at uses only
default_factory so it never changes on updates; modify the model
(GlobalSettings) to add an onupdate hook for updated_at (e.g., set onupdate to a
callable that returns datetime.now(tz=timezone.utc)) so updated_at is refreshed
automatically on record updates, leaving created_at as-is; update the Field
definition for updated_at to include onupdate and ensure timezone-aware datetime
is used consistently.

updated_by: UUID | None = Field(
default=None,
foreign_key="user.id",
nullable=True,
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""LangWatch service exception hierarchy.

These exceptions are raised by LangWatchService and caught by the
usage router, which maps them to structured HTTP error responses.
"""


class LangWatchError(Exception):
"""Base class for all LangWatch service errors."""



class LangWatchKeyNotConfiguredError(LangWatchError):
"""No LangWatch API key is stored in GlobalSettings.

HTTP mapping: 503 KEY_NOT_CONFIGURED
"""



class LangWatchInvalidKeyError(LangWatchError):
"""LangWatch returned 401 with an 'invalid key' body.

HTTP mapping: 422 INVALID_KEY
"""



class LangWatchInsufficientCreditsError(LangWatchError):
"""LangWatch returned 401 with an 'insufficient credits' body.

HTTP mapping: 422 INSUFFICIENT_CREDITS
"""



class LangWatchConnectionError(LangWatchError):
"""LangWatch could not be reached due to a network error or timeout.

HTTP mapping: 503 LANGWATCH_UNAVAILABLE
"""



class LangWatchUnavailableError(LangWatchError):
"""LangWatch returned 5xx or a connection error occurred.

HTTP mapping: 503 LANGWATCH_UNAVAILABLE
"""



class LangWatchTimeoutError(LangWatchError):
"""LangWatch did not respond within the configured timeout.

HTTP mapping: 503 LANGWATCH_TIMEOUT
"""

Loading