Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions backend/alembic/versions/012_add_asset_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Add asset_type to gpts table for Project support

Revision ID: 012
Revises: 011
Create Date: 2026-03-17

asset_type: 'gpt' | 'project' — allows the gpts table to store both
Custom GPTs and OpenAI Projects as a unified asset registry.

conversation_count / last_conversation_at: placeholders for Phase 2
Conversation Intelligence (populated when the Conversations API is added).
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

revision: str = "012"
down_revision: Union[str, None] = "011"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"gpts",
sa.Column(
"asset_type",
sa.String(32),
nullable=False,
server_default="gpt",
),
)
op.add_column(
"gpts",
sa.Column("conversation_count", sa.Integer, nullable=False, server_default="0"),
)
op.add_column(
"gpts",
sa.Column(
"last_conversation_at",
sa.DateTime(timezone=True),
nullable=True,
),
)
op.create_index("ix_gpts_asset_type", "gpts", ["asset_type"])


def downgrade() -> None:
op.drop_index("ix_gpts_asset_type", table_name="gpts")
op.drop_column("gpts", "last_conversation_at")
op.drop_column("gpts", "conversation_count")
op.drop_column("gpts", "asset_type")
6 changes: 6 additions & 0 deletions backend/app/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class GPT(Base):
DateTime(timezone=True)
)

asset_type: Mapped[str] = mapped_column(String(32), default="gpt", nullable=False)
conversation_count: Mapped[int] = mapped_column(Integer, default=0)
last_conversation_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True)
)

content_hash: Mapped[str | None] = mapped_column(String(64))
sync_log_id: Mapped[int | None] = mapped_column(Integer, ForeignKey("sync_logs.id"))
indexed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
Expand Down
1 change: 1 addition & 0 deletions backend/app/schemas/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class GPTRead(BaseModel):
llm_summary: str | None
use_case_description: str | None = None
instructions: str | None = None
asset_type: str = "gpt"
# Semantic enrichment fields
business_process: str | None = None
risk_flags: list | None = None
Expand Down
116 changes: 96 additions & 20 deletions backend/app/services/compliance_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,19 @@ def __init__(
async def close(self):
await self._client.aclose()

async def fetch_all_gpts(
async def _fetch_paginated(
self,
workspace_id: str,
endpoint: str,
normalize_fn: Callable[[dict], dict],
on_page: Callable[[list[dict], int], Coroutine[Any, Any, None]] | None = None,
) -> list[dict]:
all_gpts: list[dict] = []
"""Fetch all pages from a cursor-paginated endpoint and normalize each item.

endpoint: full URL, e.g. .../workspaces/{id}/gpts
normalize_fn: called on each raw item to produce a uniform dict
on_page: optional progress callback(batch, page_number)
"""
all_items: list[dict] = []
after: str | None = None
page = 0

Expand All @@ -60,38 +67,47 @@ async def fetch_all_gpts(
if after:
params["after"] = after

url = f"{self._base_url}/compliance/workspaces/{workspace_id}/gpts"
logger.info(f"Requesting: GET {url} params={params}")

response = await self._request_with_retries(
"GET",
url,
params=params,
)

logger.info(f"Requesting: GET {endpoint} params={params}")
response = await self._request_with_retries("GET", endpoint, params=params)
logger.info(
f"Response: status={response.status_code} length={len(response.text)}"
)

data = response.json()
gpts = data.get("data", [])
all_gpts.extend(gpts)
items = data.get("data", [])
all_items.extend(items)
page += 1

logger.info(
f"Page {page}: got {len(gpts)} GPTs, has_more={data.get('has_more')}"
f"Page {page}: got {len(items)} items, has_more={data.get('has_more')}"
)

if on_page:
await on_page(gpts, page)
await on_page(items, page)

if not data.get("has_more", False) or not gpts:
if not data.get("has_more", False) or not items:
break

after = data.get("last_id") or gpts[-1].get("id")
after = data.get("last_id") or items[-1].get("id")

logger.info(f"Fetch complete: {len(all_gpts)} total raw GPTs")
return [self._normalize_gpt(g) for g in all_gpts]
logger.info(f"Fetch complete: {len(all_items)} total raw items from {endpoint}")
return [normalize_fn(item) for item in all_items]

async def fetch_all_gpts(
self,
workspace_id: str,
on_page: Callable[[list[dict], int], Coroutine[Any, Any, None]] | None = None,
) -> list[dict]:
url = f"{self._base_url}/compliance/workspaces/{workspace_id}/gpts"
return await self._fetch_paginated(url, self._normalize_gpt, on_page)

async def fetch_all_projects(
self,
workspace_id: str,
on_page: Callable[[list[dict], int], Coroutine[Any, Any, None]] | None = None,
) -> list[dict]:
url = f"{self._base_url}/compliance/workspaces/{workspace_id}/projects"
return await self._fetch_paginated(url, self._normalize_project, on_page)

@staticmethod
def _normalize_gpt(raw: dict) -> dict:
Expand Down Expand Up @@ -135,6 +151,66 @@ def _normalize_gpt(raw: dict) -> dict:
"conversation_starters": config.get("conversation_starters"),
}

@staticmethod
def _normalize_project(raw: dict) -> dict:
"""Flatten the Projects API response into the same uniform dict as _normalize_gpt.

Projects share the same latest_config / sharing envelope as GPTs; the
main structural difference is that the id prefix is 'g-p-...' and the
tool set may include project-only types (deep_research, web_browsing, canvas).
"""
from datetime import datetime, timezone

sharing = raw.get("sharing") or {}
config = raw.get("latest_config") or {}
# Projects use flat latest_config (not a nested data list like GPTs)
if isinstance(config.get("data"), list):
config_list = config.get("data") or []
config = config_list[0] if config_list else {}

recipients_obj = sharing.get("recipients") or {}
recipients = (
recipients_obj.get("data", []) if isinstance(recipients_obj, dict) else []
)

tools_obj = config.get("tools") or {}
tools = (
tools_obj.get("data", [])
if isinstance(tools_obj, dict)
else (tools_obj if isinstance(tools_obj, list) else [])
)
files_obj = config.get("files") or {}
files = (
files_obj.get("data", [])
if isinstance(files_obj, dict)
else (files_obj if isinstance(files_obj, list) else [])
)

created_at_raw = raw.get("created_at")
created_at = None
if isinstance(created_at_raw, (int, float)):
created_at = datetime.fromtimestamp(created_at_raw, tz=timezone.utc)
elif isinstance(created_at_raw, str):
created_at = created_at_raw

return {
"id": raw.get("id"),
"name": config.get("name") or raw.get("name"),
"description": config.get("description"),
"instructions": config.get("instructions") or "",
"owner_email": raw.get("owner_email"),
"builder_name": raw.get("builder_name"),
"created_at": created_at,
"visibility": sharing.get("visibility"),
"recipients": recipients,
"shared_user_count": len(recipients),
"tools": tools,
"files": files,
"builder_categories": config.get("categories"),
"conversation_starters": config.get("conversation_starters"),
"asset_type": "project",
}

async def fetch_all_users(self, workspace_id: str) -> list[dict]:
all_users: list[dict] = []
after: str | None = None
Expand Down
Loading
Loading