From 6c3bc8df10d61c6f9e6505f5efb7ad20819f90f5 Mon Sep 17 00:00:00 2001 From: Aaron Train Date: Fri, 30 Jan 2026 11:23:07 -0500 Subject: [PATCH 1/2] llm-cloud-run: Vector RAG Skeleton --- llm-cloud-run/.dockerignore | 16 +- llm-cloud-run/Dockerfile | 11 +- llm-cloud-run/common/__init__.py | 0 llm-cloud-run/common/config.py | 53 ++++ llm-cloud-run/common/logging_utils.py | 10 + llm-cloud-run/llm/__init__.py | 0 llm-cloud-run/llm/embeddings.py | 18 ++ llm-cloud-run/llm/gemini_client.py | 1 + llm-cloud-run/llm/vertex_init.py | 21 ++ llm-cloud-run/main.py | 279 +++++++--------------- llm-cloud-run/requirements.txt | 2 + llm-cloud-run/retrieval/__init__.py | 0 llm-cloud-run/retrieval/python_cosine.py | 48 ++++ llm-cloud-run/retrieval/retriever.py | 12 + llm-cloud-run/retrieval/types.py | 12 + llm-cloud-run/retrieval/utils.py | 17 ++ llm-cloud-run/services/__init__.py | 0 llm-cloud-run/services/analyze_service.py | 53 ++++ llm-cloud-run/storage/__init__.py | 0 llm-cloud-run/storage/bigquery_repo.py | 86 +++++++ 20 files changed, 431 insertions(+), 208 deletions(-) create mode 100644 llm-cloud-run/common/__init__.py create mode 100644 llm-cloud-run/common/config.py create mode 100644 llm-cloud-run/common/logging_utils.py create mode 100644 llm-cloud-run/llm/__init__.py create mode 100644 llm-cloud-run/llm/embeddings.py create mode 100644 llm-cloud-run/llm/gemini_client.py create mode 100644 llm-cloud-run/llm/vertex_init.py create mode 100644 llm-cloud-run/retrieval/__init__.py create mode 100644 llm-cloud-run/retrieval/python_cosine.py create mode 100644 llm-cloud-run/retrieval/retriever.py create mode 100644 llm-cloud-run/retrieval/types.py create mode 100644 llm-cloud-run/retrieval/utils.py create mode 100644 llm-cloud-run/services/__init__.py create mode 100644 llm-cloud-run/services/analyze_service.py create mode 100644 llm-cloud-run/storage/__init__.py create mode 100644 llm-cloud-run/storage/bigquery_repo.py diff --git a/llm-cloud-run/.dockerignore b/llm-cloud-run/.dockerignore index 2d4091e0..c39166d9 100644 --- a/llm-cloud-run/.dockerignore +++ b/llm-cloud-run/.dockerignore @@ -1,7 +1,9 @@ -# Vim artifacts -*.sw? -.*.sw? -*~  -4913 - -.DS_Store +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +.venv +venv +.git +.pytest_cache \ No newline at end of file diff --git a/llm-cloud-run/Dockerfile b/llm-cloud-run/Dockerfile index 29a9bc93..057b1394 100644 --- a/llm-cloud-run/Dockerfile +++ b/llm-cloud-run/Dockerfile @@ -1,13 +1,14 @@ -FROM python:3.13-slim +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 WORKDIR /app COPY requirements.txt . - RUN pip install --no-cache-dir -r requirements.txt COPY . . -EXPOSE 8080 - -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] +ENV PORT=8080 +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/llm-cloud-run/common/__init__.py b/llm-cloud-run/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/llm-cloud-run/common/config.py b/llm-cloud-run/common/config.py new file mode 100644 index 00000000..a58538a1 --- /dev/null +++ b/llm-cloud-run/common/config.py @@ -0,0 +1,53 @@ +import os +from dataclasses import dataclass + + +def _env(name: str, default: str | None = None) -> str | None: + v = os.getenv(name) + if v is None or v == "": + return default + return v + + +@dataclass(frozen=True) +class Settings: + # GCP / Vertex + gcp_project: str | None = _env("GCP_PROJECT") or _env("GOOGLE_CLOUD_PROJECT") + gcp_location: str = _env("GCP_LOCATION", "us-central1") or "us-central1" + + # BigQuery + bq_project: str | None = _env("BQ_PROJECT") # optional; defaults to gcp_project + bq_dataset: str = _env("BQ_DATASET", "vertex_rag_demo") or "vertex_rag_demo" + bq_notes_table: str = _env("BQ_NOTES_TABLE", "demo_Notes") or "demo_Notes" + bq_embeddings_table: str = _env("BQ_EMBEDDINGS_TABLE", "demo_NoteEmbeddings") or "demo_NoteEmbeddings" + + # Column names (override if your schema differs) + notes_id_col: str = _env("NOTES_ID_COL", "note_id") or "note_id" + notes_content_col: str = _env("NOTES_CONTENT_COL", "content") or "content" + notes_source_col: str = _env("NOTES_SOURCE_COL", "source") or "source" + notes_created_col: str = _env("NOTES_CREATED_COL", "created_at") or "created_at" + + emb_id_col: str = _env("EMB_ID_COL", "note_id") or "note_id" + emb_vector_col: str = _env("EMB_VECTOR_COL", "embedding") or "embedding" # ARRAY + emb_model_col: str = _env("EMB_MODEL_COL", "model") or "model" + emb_updated_col: str = _env("EMB_UPDATED_COL", "updated_at") or "updated_at" + + # RAG settings + rag_top_k: int = int(_env("RAG_TOP_K", "3") or "3") + snippet_chars: int = int(_env("RAG_SNIPPET_CHARS", "800") or "800") + + # LLM + Embeddings + gemini_model: str = _env("GEMINI_MODEL", "gemini-2.5-flash-lite") or "gemini-2.5-flash-lite" + embedding_model: str = _env("EMBEDDING_MODEL", "text-embedding-004") or "text-embedding-004" + + # Optional: cap how many notes to load into memory cache + bq_max_notes: int = int(_env("BQ_MAX_NOTES", "5000") or "5000") + + @property + def effective_bq_project(self) -> str | None: + return self.bq_project or self.gcp_project + + def require_project(self) -> str: + if not self.gcp_project: + raise RuntimeError("GCP_PROJECT / GOOGLE_CLOUD_PROJECT not set.") + return self.gcp_project diff --git a/llm-cloud-run/common/logging_utils.py b/llm-cloud-run/common/logging_utils.py new file mode 100644 index 00000000..b1a6bbe5 --- /dev/null +++ b/llm-cloud-run/common/logging_utils.py @@ -0,0 +1,10 @@ +import logging +import os + + +def setup_logging() -> None: + level = os.getenv("LOG_LEVEL", "INFO").upper() + logging.basicConfig( + level=getattr(logging, level, logging.INFO), + format="%(asctime)s %(levelname)s %(name)s - %(message)s", + ) \ No newline at end of file diff --git a/llm-cloud-run/llm/__init__.py b/llm-cloud-run/llm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/llm-cloud-run/llm/embeddings.py b/llm-cloud-run/llm/embeddings.py new file mode 100644 index 00000000..a9c12f63 --- /dev/null +++ b/llm-cloud-run/llm/embeddings.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from typing import List +from vertexai.language_models import TextEmbeddingModel + +from common.config import Settings +from llm.vertex_init import init_vertex + +_MODEL: TextEmbeddingModel | None = None + + +def embed_text(settings: Settings, text: str) -> List[float]: + """Return an embedding vector for given text using Vertex Embeddings.""" + global _MODEL + init_vertex(settings) + if _MODEL is None: + _MODEL = TextEmbeddingModel.from_pretrained(settings.embedding_model) + return _MODEL.get_embeddings([text])[0].values diff --git a/llm-cloud-run/llm/gemini_client.py b/llm-cloud-run/llm/gemini_client.py new file mode 100644 index 00000000..17948737 --- /dev/null +++ b/llm-cloud-run/llm/gemini_client.py @@ -0,0 +1 @@ +# TODO: implement \ No newline at end of file diff --git a/llm-cloud-run/llm/vertex_init.py b/llm-cloud-run/llm/vertex_init.py new file mode 100644 index 00000000..2e36901b --- /dev/null +++ b/llm-cloud-run/llm/vertex_init.py @@ -0,0 +1,21 @@ +import logging +import vertexai +from common.config import Settings + +_INITIALIZED = False + + +def init_vertex(settings: Settings) -> None: + """Initialize Vertex AI once per process.""" + global _INITIALIZED + if _INITIALIZED: + return + + project = settings.require_project() + vertexai.init(project=project, location=settings.gcp_location) + logging.getLogger(__name__).info( + "Vertex AI initialized (project=%s, location=%s).", + project, + settings.gcp_location, + ) + _INITIALIZED = True diff --git a/llm-cloud-run/main.py b/llm-cloud-run/main.py index 69e87f8e..40ee7ccd 100644 --- a/llm-cloud-run/main.py +++ b/llm-cloud-run/main.py @@ -1,208 +1,95 @@ +from __future__ import annotations + import logging -import math -import os -import json +from typing import Optional from fastapi import FastAPI, Form, File, UploadFile, HTTPException from fastapi.responses import JSONResponse -from typing import Optional - -import vertexai -from vertexai.preview.generative_models import GenerativeModel, Part -from vertexai.language_models import TextEmbeddingModel - -logging.basicConfig(level=logging.INFO) - -app = FastAPI() - -# ---- Vertex AI init ---- -GCP_PROJECT = os.getenv("GCP_PROJECT") or os.getenv("GOOGLE_CLOUD_PROJECT") -if not GCP_PROJECT: - logging.warning("GCP_PROJECT / GOOGLE_CLOUD_PROJECT not set; vertexai.init may fail.") -vertexai.init(project=GCP_PROJECT, location="us-central1") - -# ---- RAG: notes.json + embeddings ---- - -NOTES = [] # list of note dicts -NOTE_VECTORS = [] # list of embedding vectors (lists of floats) -EMBED_MODEL: Optional[TextEmbeddingModel] = None - - -def embed_text(text: str): - """Return embedding vector for given text using Vertex embedding model.""" - global EMBED_MODEL - if EMBED_MODEL is None: - EMBED_MODEL = TextEmbeddingModel.from_pretrained("text-embedding-004") - return EMBED_MODEL.get_embeddings([text])[0].values - - -def cosine(a, b): - """Cosine similarity between two vectors.""" - dot = sum(x * y for x, y in zip(a, b)) - na = math.sqrt(sum(x * x for x in a)) - nb = math.sqrt(sum(y * y for y in b)) - return dot / (na * nb) if na and nb else 0.0 - - -def load_notes(): - """Load notes.json and precompute embeddings for each note.content.""" - global NOTES, NOTE_VECTORS - notes_path = os.path.join(os.path.dirname(__file__), "notes.json") - if not os.path.exists(notes_path): - logging.warning("notes.json not found; RAG will be disabled.") - NOTES = [] - NOTE_VECTORS = [] - return - - try: - with open(notes_path, "r", encoding="utf-8") as f: - NOTES = json.load(f) - if not isinstance(NOTES, list): - raise ValueError("notes.json must be a list of note objects") - except Exception as e: - logging.error(f"Failed to load notes.json: {e}") - NOTES = [] - NOTE_VECTORS = [] - return - - NOTE_VECTORS = [] - for note in NOTES: - content = note.get("content", "") - if not content: - NOTE_VECTORS.append([]) - continue - vec = embed_text(content) - NOTE_VECTORS.append(vec) - logging.info(f"Loaded {len(NOTES)} notes for RAG.") - - -def retrieve_top_notes(snippet: str, top_k: int = 3): - """Return top_k most similar notes for given log snippet.""" - if not NOTES or not NOTE_VECTORS: - return [] - - qvec = embed_text(snippet) - scored = [] - for note, vec in zip(NOTES, NOTE_VECTORS): - if not vec: - continue - score = cosine(qvec, vec) - scored.append((score, note)) - - scored.sort(key=lambda x: x[0], reverse=True) - top = [note for score, note in scored[:top_k]] - return top - - -# load notes when the module is imported -load_notes() - -# ---- FastAPI endpoint ---- - - -@app.post("/analyze") -async def analyze( - prompt: str = Form(...), - log_file: Optional[UploadFile] = File(None), - image: Optional[UploadFile] = File(None), -): - """ - Main endpoint: - - prompt: user instruction (required) - - log_file: optional text log file (e.g., crash stack) - - image: optional image (e.g., screenshot) - """ - - parts = [] - - # --- Handle log file (if provided) --- - log_text = "" - if log_file is not None: - # You can adjust allowed types as needed - allowed_logs = {"text/plain", "text/x-log", "application/octet-stream"} - if log_file.content_type not in allowed_logs: - raise HTTPException( - status_code=400, - detail=f"Unsupported log file type: {log_file.content_type}. " - f"Allowed: {', '.join(sorted(allowed_logs))}", - ) - log_bytes = await log_file.read() - try: - log_text = log_bytes.decode(errors="ignore") - except Exception: - log_text = "" - logging.info( - f"Received log file: {log_file.filename}, " - f"type: {log_file.content_type}, size: {len(log_bytes)} bytes" - ) - - # --- RAG: retrieve notes based on a snippet of the log (or prompt fallback) --- - snippet_source = log_text or prompt - snippet = snippet_source[:800] # small snippet for similarity search - - top_notes = retrieve_top_notes(snippet, top_k=3) - if top_notes: - context_lines = [] - for i, n in enumerate(top_notes, start=1): - context_lines.append( - f"[{i}] {n.get('content', '')} " - f"(Source: {n.get('source', 'unknown')})" +from common.config import Settings +from common.logging_utils import setup_logging +from storage.bigquery_repo import BigQueryNotesRepository +from retrieval.python_cosine import PythonCosineRetriever +from services.analyze_service import analyze as analyze_service + + +def create_app() -> FastAPI: + setup_logging() + settings = Settings() + + app = FastAPI() + + # Dependencies (constructed once per process) + repo = BigQueryNotesRepository(settings) + retriever = PythonCosineRetriever(settings, repo) + + @app.get("/healthz") + async def healthz(): + return {"ok": True} + + @app.post("/analyze") + async def analyze( + prompt: str = Form(...), + log_file: Optional[UploadFile] = File(None), + image: Optional[UploadFile] = File(None), + ): + log_text = "" + if log_file is not None: + allowed_logs = {"text/plain", "text/x-log", "application/octet-stream"} + if log_file.content_type not in allowed_logs: + raise HTTPException( + status_code=400, + detail=( + f"Unsupported log file type: {log_file.content_type}. " + f"Allowed: {', '.join(sorted(allowed_logs))}" + ), + ) + log_bytes = await log_file.read() + try: + log_text = log_bytes.decode(errors="ignore") + except Exception: + log_text = "" + logging.getLogger(__name__).info( + "Received log file: %s type=%s size=%d", + log_file.filename, + log_file.content_type, + len(log_bytes), ) - context_block = "\n".join(context_lines) - augmented_prompt = ( - "You are a crash analysis assistant. Use the context below to help:\n\n" - f"Context:\n{context_block}\n\n" - f"User task:\n{prompt}" - ) - else: - augmented_prompt = prompt - # --- Build text part(s) for Gemini --- - # main text (augmented prompt) - parts.append(Part.from_text(augmented_prompt)) - - # optionally include a raw snippet of the log as extra text - if log_text: - parts.append( - Part.from_text( - "\n\nRaw crash snippet (truncated):\n" + log_text[:4000] + image_bytes = None + image_mime = None + if image is not None: + allowed_images = {"image/png", "image/jpeg", "image/jpg", "image/webp"} + if image.content_type not in allowed_images: + raise HTTPException( + status_code=400, + detail=( + f"Unsupported image type: {image.content_type}. " + f"Allowed: {', '.join(sorted(allowed_images))}" + ), + ) + image_bytes = await image.read() + image_mime = image.content_type + logging.getLogger(__name__).info( + "Including image: %s type=%s size=%d", + image.filename, + image.content_type, + len(image_bytes), ) - ) - # --- Handle image (if provided) --- - if image is not None: - allowed_images = {"image/png", "image/jpeg", "image/jpg", "image/webp"} - if image.content_type not in allowed_images: - raise HTTPException( - status_code=400, - detail=( - f"Unsupported image type: {image.content_type}. " - f"Allowed: {', '.join(sorted(allowed_images))}" - ), + try: + output = analyze_service( + settings, + retriever=retriever, + prompt=prompt, + log_text=log_text, + image_bytes=image_bytes, + image_mime_type=image_mime, ) - img_bytes = await image.read() - parts.append( - Part.from_data(mime_type=image.content_type, data=img_bytes) - ) - logging.info( - f"Including image in prompt: {image.filename}, " - f"type: {image.content_type}, size: {len(img_bytes)} bytes" - ) + except Exception as e: + logging.getLogger(__name__).exception("Analyze failed") + raise HTTPException(status_code=500, detail=str(e)) - # --- Call Gemini --- - model = GenerativeModel("gemini-2.5-flash-lite") - try: - response = model.generate_content( - parts if len(parts) > 1 else parts[0], - generation_config={ - "temperature": 0.3, - "max_output_tokens": 1024, - }, - ) - output_text = response.text - except Exception as e: - logging.exception("Error calling Gemini model") - raise HTTPException(status_code=500, detail=str(e)) + return JSONResponse({"output": output}) - return JSONResponse({"output": output_text}) + return app diff --git a/llm-cloud-run/requirements.txt b/llm-cloud-run/requirements.txt index 461f5d9e..0b93da99 100644 --- a/llm-cloud-run/requirements.txt +++ b/llm-cloud-run/requirements.txt @@ -4,3 +4,5 @@ google-cloud-aiplatform google-cloud-storage pydantic python-multipart +google-cloud-bigquery +google-cloud-core diff --git a/llm-cloud-run/retrieval/__init__.py b/llm-cloud-run/retrieval/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/llm-cloud-run/retrieval/python_cosine.py b/llm-cloud-run/retrieval/python_cosine.py new file mode 100644 index 00000000..1e8b89a5 --- /dev/null +++ b/llm-cloud-run/retrieval/python_cosine.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import logging +from typing import List + +from common.config import Settings +from llm.embeddings import embed_text +from retrieval.retriever import Retriever +from retrieval.types import Note +from retrieval.utils import cosine +from storage.bigquery_repo import BigQueryNotesRepository + + +class PythonCosineRetriever(Retriever): + """ + Bridge retriever: + - Notes + embeddings are stored in BigQuery + - Similarity is computed in Python (swap later for vector-native search) + """ + + def __init__(self, settings: Settings, repo: BigQueryNotesRepository): + self.settings = settings + self.repo = repo + self._cache: list[Note] | None = None + + def _load_notes_once(self) -> list[Note]: + if self._cache is not None: + return self._cache + notes = self.repo.fetch_notes_with_embeddings(limit=self.settings.bq_max_notes) + self._cache = notes + logging.getLogger(__name__).info("Loaded %d notes (with embeddings) from BigQuery.", len(notes)) + return notes + + def retrieve(self, snippet: str, *, top_k: int) -> List[Note]: + notes = self._load_notes_once() + if not notes: + return [] + + qvec = embed_text(self.settings, snippet) + + scored: list[tuple[float, Note]] = [] + for n in notes: + if not n.embedding: + continue + scored.append((cosine(qvec, n.embedding), n)) + + scored.sort(key=lambda t: t[0], reverse=True) + return [n for _, n in scored[:top_k]] \ No newline at end of file diff --git a/llm-cloud-run/retrieval/retriever.py b/llm-cloud-run/retrieval/retriever.py new file mode 100644 index 00000000..077344b1 --- /dev/null +++ b/llm-cloud-run/retrieval/retriever.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import List + +from retrieval.types import Note + + +class Retriever(ABC): + @abstractmethod + def retrieve(self, snippet: str, *, top_k: int) -> List[Note]: + raise NotImplementedError diff --git a/llm-cloud-run/retrieval/types.py b/llm-cloud-run/retrieval/types.py new file mode 100644 index 00000000..b03df7e4 --- /dev/null +++ b/llm-cloud-run/retrieval/types.py @@ -0,0 +1,12 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import List + + +@dataclass(frozen=True) +class Note: + note_id: str + content: str + source: str | None = None + created_at: str | None = None + embedding: List[float] | None = None diff --git a/llm-cloud-run/retrieval/utils.py b/llm-cloud-run/retrieval/utils.py new file mode 100644 index 00000000..7b5f83c3 --- /dev/null +++ b/llm-cloud-run/retrieval/utils.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +import math +from typing import Sequence + + +def cosine(a: Sequence[float], b: Sequence[float]) -> float: + dot = 0.0 + na = 0.0 + nb = 0.0 + for x, y in zip(a, b): + dot += x * y + na += x * x + nb += y * y + if na <= 0.0 or nb <= 0.0: + return 0.0 + return dot / (math.sqrt(na) * math.sqrt(nb)) diff --git a/llm-cloud-run/services/__init__.py b/llm-cloud-run/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/llm-cloud-run/services/analyze_service.py b/llm-cloud-run/services/analyze_service.py new file mode 100644 index 00000000..1e409a6a --- /dev/null +++ b/llm-cloud-run/services/analyze_service.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from vertexai.preview.generative_models import Part + +from common.config import Settings +from llm.gemini_client import generate +from retrieval.retriever import Retriever + + +def build_augmented_prompt(prompt: str, *, top_notes: list[dict]) -> str: + if not top_notes: + return prompt + + context_lines = [] + for i, n in enumerate(top_notes, start=1): + content = n.get("content", "") + source = n.get("source") or "unknown" + context_lines.append(f"[{i}] {content} (Source: {source})") + + context_block = "\n".join(context_lines) + return ( + "You are a crash analysis assistant. Use the context below to help:\n\n" + f"Context:\n{context_block}\n\n" + f"User task:\n{prompt}" + ) + + +def analyze( + settings: Settings, + *, + retriever: Retriever, + prompt: str, + log_text: str = "", + image_bytes: bytes | None = None, + image_mime_type: str | None = None, +) -> str: + snippet_source = log_text or prompt + snippet = snippet_source[: settings.snippet_chars] + + notes = retriever.retrieve(snippet, top_k=settings.rag_top_k) + top_notes = [{"content": n.content, "source": n.source} for n in notes] + + augmented_prompt = build_augmented_prompt(prompt, top_notes=top_notes) + + parts: list[Part] = [Part.from_text(augmented_prompt)] + + if log_text: + parts.append(Part.from_text("\n\nRaw crash snippet (truncated):\n" + log_text[:4000])) + + if image_bytes and image_mime_type: + parts.append(Part.from_data(mime_type=image_mime_type, data=image_bytes)) + + return generate(settings, parts if len(parts) > 1 else parts[0], temperature=0.3, max_output_tokens=1024) diff --git a/llm-cloud-run/storage/__init__.py b/llm-cloud-run/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/llm-cloud-run/storage/bigquery_repo.py b/llm-cloud-run/storage/bigquery_repo.py new file mode 100644 index 00000000..2adaef97 --- /dev/null +++ b/llm-cloud-run/storage/bigquery_repo.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from typing import List +from google.cloud import bigquery + +from common.config import Settings +from retrieval.types import Note + + +class BigQueryNotesRepository: + def __init__(self, settings: Settings): + self.settings = settings + project = settings.effective_bq_project + self.client = bigquery.Client(project=project) # ADC / service account on Cloud Run + + def _table(self, table_name: str) -> str: + project = self.settings.effective_bq_project + return f"{project}.{self.settings.bq_dataset}.{table_name}" + + def fetch_notes_with_embeddings(self, *, limit: int = 5000) -> List[Note]: + s = self.settings + notes_t = self._table(s.bq_notes_table) + emb_t = self._table(s.bq_embeddings_table) + + query = f""" + SELECT + n.{s.notes_id_col} AS note_id, + n.{s.notes_content_col} AS content, + n.{s.notes_source_col} AS source, + CAST(n.{s.notes_created_col} AS STRING) AS created_at, + e.{s.emb_vector_col} AS embedding + FROM `{notes_t}` n + LEFT JOIN `{emb_t}` e + ON n.{s.notes_id_col} = e.{s.emb_id_col} + WHERE n.{s.notes_content_col} IS NOT NULL + ORDER BY n.{s.notes_created_col} DESC + LIMIT @limit + """ + + job = self.client.query( + query, + job_config=bigquery.QueryJobConfig( + query_parameters=[bigquery.ScalarQueryParameter("limit", "INT64", int(limit))] + ), + ) + rows = list(job.result()) + + notes: List[Note] = [] + for r in rows: + emb = r.get("embedding") + notes.append( + Note( + note_id=str(r.get("note_id", "")), + content=r.get("content") or "", + source=r.get("source"), + created_at=r.get("created_at"), + embedding=list(emb) if emb is not None else None, + ) + ) + return notes + + def insert_note(self, *, note_id: str, content: str, source: str | None = None) -> None: + s = self.settings + table_id = self._table(s.bq_notes_table) + + row = { + s.notes_id_col: note_id, + s.notes_content_col: content, + s.notes_source_col: source, + } + errors = self.client.insert_rows_json(table_id, [row]) + if errors: + raise RuntimeError(f"BigQuery insert_note errors: {errors}") + + def insert_embedding(self, *, note_id: str, embedding: list[float], model: str | None = None) -> None: + s = self.settings + table_id = self._table(s.bq_embeddings_table) + + row = { + s.emb_id_col: note_id, + s.emb_vector_col: embedding, # must be ARRAY + s.emb_model_col: model or s.embedding_model, + } + errors = self.client.insert_rows_json(table_id, [row]) + if errors: + raise RuntimeError(f"BigQuery insert_embedding errors: {errors}") From 1af8104d7fc9273e2541647e24957313d213fd78 Mon Sep 17 00:00:00 2001 From: Aaron Train Date: Mon, 2 Feb 2026 10:39:15 -0500 Subject: [PATCH 2/2] Add gemini_client.py --- llm-cloud-run/llm/gemini_client.py | 37 +++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/llm-cloud-run/llm/gemini_client.py b/llm-cloud-run/llm/gemini_client.py index 17948737..939e89c0 100644 --- a/llm-cloud-run/llm/gemini_client.py +++ b/llm-cloud-run/llm/gemini_client.py @@ -1 +1,36 @@ -# TODO: implement \ No newline at end of file +from __future__ import annotations + +from vertexai.preview.generative_models import GenerativeModel, Part + +from common.config import Settings +from llm.vertex_init import init_vertex + + +def generate( + settings: Settings, + parts: list[Part] | Part, + *, + temperature: float = 0.3, + max_output_tokens: int = 1024, +) -> str: + """ + Generate text using a Gemini model on Vertex AI. + + `parts` can be: + - a single Part (text or image) + - a list of Parts (multimodal prompt) + """ + init_vertex(settings) + + model = GenerativeModel(settings.gemini_model) + + response = model.generate_content( + parts, + generation_config={ + "temperature": temperature, + "max_output_tokens": max_output_tokens, + }, + ) + + # Vertex responses expose `.text` for convenience + return getattr(response, "text", "") or ""