diff --git a/CLI-COMMANDS.md b/CLI-COMMANDS.md index fde4c621..c1a7b324 100644 --- a/CLI-COMMANDS.md +++ b/CLI-COMMANDS.md @@ -64,6 +64,64 @@ roboflow version list -p my-project roboflow model list -p my-project ``` +### Manage folders + +```bash +roboflow folder list +roboflow folder create "Training Data" --projects proj1,proj2 +roboflow folder get +roboflow folder update --name "New Name" +roboflow folder delete +``` + +### Annotation batches and jobs + +```bash +roboflow annotation batch list -p my-project +roboflow annotation batch get -p my-project +roboflow annotation job list -p my-project +roboflow annotation job create -p my-project --name "Label round 1" \ + --batch --num-images 100 --labeler a@co.com --reviewer b@co.com +``` + +### Workflows + +```bash +roboflow workflow list +roboflow workflow get my-workflow +roboflow workflow create --name "My Workflow" --definition workflow.json +roboflow workflow update my-workflow --definition updated.json +roboflow workflow version list my-workflow +roboflow workflow fork other-ws/their-workflow +``` + +### Create a dataset version + +```bash +roboflow version create -p my-project --settings settings.json +``` + +### Workspace stats and billing + +```bash +roboflow workspace usage +roboflow workspace plan +roboflow workspace stats --start-date 2026-01-01 --end-date 2026-03-31 +``` + +### Search Roboflow Universe + +```bash +roboflow universe search "hard hats" --type dataset --limit 5 +``` + +### Video inference + +```bash +roboflow video infer -p my-project -v 3 -f video.mp4 --fps 10 +roboflow video status +``` + ## JSON output for agents Every command supports `--json` for structured output that's safe to pipe: @@ -103,12 +161,12 @@ Version numbers are always numeric — that's how `x/y` is disambiguated between | `infer` | Run inference on images | | `search` | Search workspace images (RoboQL), export results | | `deployment` | Manage dedicated deployments | -| `workflow` | Manage workflows *(coming soon)* | -| `folder` | Manage project folders *(coming soon)* | +| `workflow` | Manage workflows | +| `folder` | Manage workspace folders | +| `annotation` | Annotation batches and jobs | +| `universe` | Search Roboflow Universe | +| `video` | Video inference | | `batch` | Batch processing jobs *(coming soon)* | -| `universe` | Browse Roboflow Universe *(coming soon)* | -| `video` | Video inference *(coming soon)* | -| `annotation` | Annotation batches and jobs *(coming soon)* | | `completion` | Shell completion scripts *(coming soon)* | Run `roboflow --help` for details on any command. diff --git a/roboflow/adapters/rfapi.py b/roboflow/adapters/rfapi.py index 834b30d7..63abb75a 100644 --- a/roboflow/adapters/rfapi.py +++ b/roboflow/adapters/rfapi.py @@ -477,3 +477,328 @@ def _save_annotation_error(response): return AnnotationSaveError(err_msg, status_code=response.status_code) return AnnotationSaveError(str(responsejson), status_code=response.status_code) + + +# --------------------------------------------------------------------------- +# Phase 2: Annotation batch & job endpoints +# --------------------------------------------------------------------------- + + +def list_batches(api_key, workspace_url, project_url): + """GET /{ws}/{proj}/batches — list annotation batches.""" + response = requests.get(f"{API_URL}/{workspace_url}/{project_url}/batches", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_batch(api_key, workspace_url, project_url, batch_id): + """GET /{ws}/{proj}/batches/{batch_id} — get batch details.""" + response = requests.get(f"{API_URL}/{workspace_url}/{project_url}/batches/{batch_id}", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def list_annotation_jobs(api_key, workspace_url, project_url): + """GET /{ws}/{proj}/jobs — list annotation jobs.""" + response = requests.get(f"{API_URL}/{workspace_url}/{project_url}/jobs", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_annotation_job(api_key, workspace_url, project_url, job_id): + """GET /{ws}/{proj}/jobs/{job_id} — get annotation job details.""" + response = requests.get(f"{API_URL}/{workspace_url}/{project_url}/jobs/{job_id}", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def create_annotation_job(api_key, workspace_url, project_url, *, name, batch_id=None, assignees=None): + """POST /{ws}/{proj}/jobs — create an annotation job.""" + payload = {"name": name} + if batch_id: + payload["batchId"] = batch_id + if assignees: + payload["assignees"] = assignees + response = requests.post( + f"{API_URL}/{workspace_url}/{project_url}/jobs", + params={"api_key": api_key}, + json=payload, + ) + if response.status_code not in (200, 201): + raise RoboflowError(response.text) + return response.json() + + +# --------------------------------------------------------------------------- +# Phase 2: Folder (project group) endpoints +# --------------------------------------------------------------------------- + + +def list_folders(api_key, workspace_url): + """GET /{ws}/groups — list project folders.""" + response = requests.get(f"{API_URL}/{workspace_url}/groups", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_folder(api_key, workspace_url, group_id): + """GET /{ws}/groups?groupId={id} — get folder details.""" + response = requests.get( + f"{API_URL}/{workspace_url}/groups", + params={"api_key": api_key, "groupId": group_id}, + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def create_folder(api_key, workspace_url, name, *, parent_id=None, project_ids=None): + """POST /{ws}/groups — create a project folder.""" + payload: Dict[str, Union[str, List[str], None]] = {"name": name} + if parent_id: + payload["parent_id"] = parent_id + if project_ids: + payload["projects"] = project_ids + response = requests.post( + f"{API_URL}/{workspace_url}/groups", + params={"api_key": api_key}, + json=payload, + ) + if response.status_code not in (200, 201): + raise RoboflowError(response.text) + return response.json() + + +def update_folder(api_key, workspace_url, group_id, *, name=None): + """POST /{ws}/groups/{id} — update a project folder.""" + payload: Dict[str, Optional[str]] = {} + if name: + payload["name"] = name + response = requests.post( + f"{API_URL}/{workspace_url}/groups/{group_id}", + params={"api_key": api_key}, + json=payload, + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def delete_folder(api_key, workspace_url, group_id): + """DELETE /{ws}/groups/{id} — delete a project folder.""" + response = requests.delete( + f"{API_URL}/{workspace_url}/groups/{group_id}", + params={"api_key": api_key}, + ) + if response.status_code not in (200, 204): + raise RoboflowError(response.text) + if response.status_code == 204 or not response.text.strip(): + return {} + return response.json() + + +# --------------------------------------------------------------------------- +# Phase 2: Workflow endpoints +# --------------------------------------------------------------------------- + + +def list_workflows(api_key, workspace_url): + """GET /{ws}/workflows — list workflows.""" + response = requests.get(f"{API_URL}/{workspace_url}/workflows", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_workflow(api_key, workspace_url, workflow_url): + """GET /{ws}/workflows/{url} — get workflow details.""" + response = requests.get( + f"{API_URL}/{workspace_url}/workflows/{workflow_url}", + params={"api_key": api_key}, + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def create_workflow(api_key, workspace_url, *, name, url=None, config=None, template=None): + """POST /{ws}/createWorkflow — create a workflow. + + The API validates ``name``, ``url``, ``template``, and ``config`` as + query-string parameters (all required strings). + + Args: + name: Display name for the workflow. + url: URL slug. Auto-generated from *name* when ``None``. + config: JSON string of the workflow config. Defaults to ``"{}"``. + template: JSON string of the workflow template. Defaults to ``"{}"``. + """ + if url is None: + import re + + url = re.sub(r"[^a-z0-9]+", "-", name.lower()).strip("-") + if config is None: + config = "{}" + if template is None: + template = "{}" + # config/template must be strings (the API validates with Joi.string) + if not isinstance(config, str): + config = json.dumps(config) + if not isinstance(template, str): + template = json.dumps(template) + params: Dict[str, str] = { + "api_key": api_key, + "name": name, + "url": url, + "template": template, + "config": config, + } + response = requests.post( + f"{API_URL}/{workspace_url}/createWorkflow", + params=params, + ) + if response.status_code not in (200, 201): + raise RoboflowError(response.text) + return response.json() + + +def update_workflow(api_key, workspace_url, *, workflow_id, workflow_name, workflow_url, config): + """POST /{ws}/updateWorkflow — update a workflow definition. + + The API validates ``id``, ``name``, ``url``, and ``config`` in the + request body (all required strings). + + Args: + workflow_id: The workflow's internal ID. + workflow_name: The workflow's display name. + workflow_url: The workflow's URL slug. + config: JSON string (or dict) of the workflow config. + """ + if not isinstance(config, str): + config = json.dumps(config) + payload: Dict[str, str] = { + "id": workflow_id, + "name": workflow_name, + "url": workflow_url, + "config": config, + } + response = requests.post( + f"{API_URL}/{workspace_url}/updateWorkflow", + params={"api_key": api_key}, + json=payload, + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def list_workflow_versions(api_key, workspace_url, workflow_url): + """GET /{ws}/workflows/{url}/versions — list workflow versions.""" + response = requests.get( + f"{API_URL}/{workspace_url}/workflows/{workflow_url}/versions", + params={"api_key": api_key}, + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def fork_workflow(api_key, workspace_url, *, source_workspace, source_workflow, name=None, url=None): + """POST /{ws}/forkWorkflow — fork a workflow into this workspace. + + Args: + workspace_url: Target workspace that will own the fork. + source_workspace: URL slug of the workspace that owns the source. + source_workflow: URL slug of the source workflow. + name: Optional display name for the fork. + url: Optional URL slug for the fork. + """ + payload: Dict[str, str] = { + "source_workspace": source_workspace, + "source_workflow": source_workflow, + } + if name: + payload["name"] = name + if url: + payload["url"] = url + response = requests.post( + f"{API_URL}/{workspace_url}/forkWorkflow", + params={"api_key": api_key}, + json=payload, + ) + if response.status_code not in (200, 201): + raise RoboflowError(response.text) + return response.json() + + +# --------------------------------------------------------------------------- +# Phase 2: Workspace statistics endpoints +# --------------------------------------------------------------------------- + + +def get_billing_usage(api_key, workspace_url): + """POST /{ws}/billing-usage-report — get billing usage report.""" + response = requests.post( + f"{API_URL}/{workspace_url}/billing-usage-report", + params={"api_key": api_key}, + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_plan_info(api_key): + """GET /usage/plan — get workspace plan info and limits.""" + response = requests.get(f"{API_URL}/usage/plan", params={"api_key": api_key}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def get_labeling_stats(api_key, workspace_url, *, start_date=None, end_date=None): + """GET /{ws}/stats — get annotation/labeling statistics.""" + params: Dict[str, str] = {"api_key": api_key} + if start_date: + params["startDate"] = start_date + if end_date: + params["endDate"] = end_date + response = requests.get(f"{API_URL}/{workspace_url}/stats", params=params) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +# --------------------------------------------------------------------------- +# Phase 2: Video inference status +# --------------------------------------------------------------------------- + + +def get_video_job_status(api_key, job_id): + """GET /videoinfer?jobId={id} — check video inference job status.""" + response = requests.get(f"{API_URL}/videoinfer", params={"api_key": api_key, "job_id": job_id}) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +# --------------------------------------------------------------------------- +# Phase 2: Universe search +# --------------------------------------------------------------------------- + + +def search_universe(query, *, api_key=None, project_type=None, limit=12, page=1): + """GET /universe/search — search Roboflow Universe.""" + params: Dict[str, Union[str, int]] = {"q": query, "limit": limit, "page": page} + if api_key: + params["api_key"] = api_key + if project_type: + params["type"] = project_type + response = requests.get(f"{API_URL}/universe/search", params=params) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() diff --git a/roboflow/cli/_output.py b/roboflow/cli/_output.py index 48016a75..f33380da 100644 --- a/roboflow/cli/_output.py +++ b/roboflow/cli/_output.py @@ -36,6 +36,32 @@ def output(args: Any, data: Any, text: Optional[str] = None) -> None: print(json.dumps(data, indent=2, default=str)) +_PLAN_HINT_PATTERNS: list[tuple[str, str]] = [ + ("require", "This feature requires a higher plan. Visit https://roboflow.com/pricing to upgrade."), + ("Growth plan", "This feature requires a Growth plan or higher. Visit https://roboflow.com/pricing to upgrade."), + ("Enterprise", "This feature requires an Enterprise plan. Contact sales@roboflow.com to upgrade."), + ("folder billing", "This feature requires folder billing. Visit https://app.roboflow.com/settings to enable it."), + ("Unauthorized", "Check your API key and workspace permissions. Some features require specific plan tiers."), + ("over_quota", "Your workspace has exceeded its quota. Visit https://roboflow.com/pricing to upgrade."), +] + + +def _detect_plan_hint(message: str) -> Optional[str]: + """Detect plan/billing-related errors and return an appropriate upgrade hint.""" + lower = message.lower() + for pattern, hint in _PLAN_HINT_PATTERNS: + if pattern.lower() in lower: + return hint + return None + + +def _sanitize_credentials(text: str) -> str: + """Strip API keys from URLs and other sensitive patterns in error messages.""" + import re + + return re.sub(r"api_key=[A-Za-z0-9_]+", "api_key=***", text) + + def _parse_error_message(raw: str) -> tuple[Optional[dict[str, Any]], str]: """Try to parse a raw error string that may contain embedded JSON. @@ -44,7 +70,7 @@ def _parse_error_message(raw: str) -> tuple[Optional[dict[str, Any]], str]: otherwise ``None``. The *human_readable_message* drills into nested ``error.message`` structures so the text-mode output is clean. """ - text = raw.strip() + text = _sanitize_credentials(raw.strip()) # Strip status-code prefix like "404: {...}" colon_idx = text.find(": {") if 0 < colon_idx < 5: @@ -60,7 +86,7 @@ def _parse_error_message(raw: str) -> tuple[Optional[dict[str, Any]], str]: return parsed, human except (json.JSONDecodeError, TypeError, ValueError): pass - return None, raw + return None, text # Return sanitized text, not the original raw def output_error( @@ -84,6 +110,10 @@ def output_error( """ parsed, human_message = _parse_error_message(message) + # Auto-detect plan-gated errors and add upgrade hints when none provided + if not hint: + hint = _detect_plan_hint(human_message) + if getattr(args, "json", False): # Normalise error to always be {"error": {"message": "..."}} so # consumers see a consistent schema regardless of error source. diff --git a/roboflow/cli/_resolver.py b/roboflow/cli/_resolver.py index 93317751..11f5e3c5 100644 --- a/roboflow/cli/_resolver.py +++ b/roboflow/cli/_resolver.py @@ -113,3 +113,25 @@ def resolve_resource( f"Cannot resolve '{shorthand}': expected 1-3 path segments " "(project, workspace/project, or workspace/project/version)." ) + + +def resolve_ws_and_key(args) -> Optional[Tuple[str, str]]: + """Resolve workspace and API key from CLI args. + + Returns (workspace_url, api_key) or ``None`` after calling + ``output_error`` on failure. + """ + from roboflow.cli._output import output_error + from roboflow.config import load_roboflow_api_key + + ws = getattr(args, "workspace", None) or resolve_default_workspace(api_key=getattr(args, "api_key", None)) + if not ws: + output_error(args, "No workspace specified.", hint="Use --workspace or run 'roboflow auth login'.", exit_code=2) + return None + + api_key = getattr(args, "api_key", None) or load_roboflow_api_key(ws) + if not api_key: + output_error(args, "No API key found.", hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.", exit_code=2) + return None + + return ws, api_key diff --git a/roboflow/cli/handlers/annotation.py b/roboflow/cli/handlers/annotation.py index 862b5995..231e6780 100644 --- a/roboflow/cli/handlers/annotation.py +++ b/roboflow/cli/handlers/annotation.py @@ -1,4 +1,4 @@ -"""Annotation management commands: batch and job operations (stubs).""" +"""Annotation management commands: batch and job operations.""" from __future__ import annotations @@ -7,8 +7,6 @@ if TYPE_CHECKING: import argparse -from roboflow.cli._output import stub - def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[type-arg] """Register the ``annotation`` command group.""" @@ -33,13 +31,13 @@ def _add_batch(sub: argparse._SubParsersAction) -> None: # type: ignore[type-ar # batch list p = batch_sub.add_parser("list", help="List annotation batches") p.add_argument("-p", "--project", required=True, help="Project ID") - p.set_defaults(func=stub) + p.set_defaults(func=_batch_list) # batch get p = batch_sub.add_parser("get", help="Get annotation batch details") p.add_argument("batch_id", help="Batch ID") p.add_argument("-p", "--project", required=True, help="Project ID") - p.set_defaults(func=stub) + p.set_defaults(func=_batch_get) batch_parser.set_defaults(func=lambda args: batch_parser.print_help()) @@ -56,20 +54,206 @@ def _add_job(sub: argparse._SubParsersAction) -> None: # type: ignore[type-arg] # job list p = job_sub.add_parser("list", help="List annotation jobs") p.add_argument("-p", "--project", required=True, help="Project ID") - p.set_defaults(func=stub) + p.set_defaults(func=_job_list) # job get p = job_sub.add_parser("get", help="Get annotation job details") p.add_argument("job_id", help="Job ID") p.add_argument("-p", "--project", required=True, help="Project ID") - p.set_defaults(func=stub) + p.set_defaults(func=_job_get) # job create p = job_sub.add_parser("create", help="Create an annotation job") p.add_argument("-p", "--project", required=True, help="Project ID") p.add_argument("--name", required=True, help="Job name") - p.add_argument("--batch", default=None, help="Batch ID to assign") - p.add_argument("--assignees", default=None, help="Comma-separated assignee emails") - p.set_defaults(func=stub) + p.add_argument("--batch", required=True, help="Batch ID") + p.add_argument("--num-images", required=True, type=int, help="Number of images") + p.add_argument("--labeler", required=True, help="Labeler email") + p.add_argument("--reviewer", required=True, help="Reviewer email") + p.set_defaults(func=_job_create) job_parser.set_defaults(func=lambda args: job_parser.print_help()) + + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + + +def _normalize_timestamps(obj): + """Recursively convert Firestore timestamp dicts ({"_seconds": N, "_nanoseconds": N}) to ISO 8601 strings.""" + from datetime import datetime, timezone + + if isinstance(obj, dict): + if "_seconds" in obj and "_nanoseconds" in obj and len(obj) == 2: + return datetime.fromtimestamp(obj["_seconds"], tz=timezone.utc).isoformat() + return {k: _normalize_timestamps(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_normalize_timestamps(item) for item in obj] + return obj + + +# --------------------------------------------------------------------------- +# handlers +# --------------------------------------------------------------------------- + + +def _resolve_project_context(args: argparse.Namespace): # type: ignore[return] + """Resolve workspace/project from -p flag and return (api_key, ws, proj) or call output_error.""" + from roboflow.cli._output import output_error + from roboflow.cli._resolver import resolve_resource + from roboflow.config import load_roboflow_api_key + + try: + workspace_url, project_slug, _version = resolve_resource(args.project, workspace_override=args.workspace) + except ValueError as exc: + output_error(args, str(exc)) + return None + + api_key = args.api_key or load_roboflow_api_key(workspace_url) + if not api_key: + output_error(args, "No API key found.", hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.", exit_code=2) + return None + + return api_key, workspace_url, project_slug + + +def _batch_list(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + ctx = _resolve_project_context(args) + if ctx is None: + return + api_key, workspace_url, project_slug = ctx + + try: + data = rfapi.list_batches(api_key, workspace_url, project_slug) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + batches = data if isinstance(data, list) else data.get("batches", data) + batches = _normalize_timestamps(batches) + + table = format_table( + batches if isinstance(batches, list) else [], + columns=["name", "id", "status", "images"], + headers=["NAME", "ID", "STATUS", "IMAGE_COUNT"], + ) + output(args, batches, text=table) + + +def _batch_get(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + ctx = _resolve_project_context(args) + if ctx is None: + return + api_key, workspace_url, project_slug = ctx + + try: + data = rfapi.get_batch(api_key, workspace_url, project_slug, args.batch_id) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + data = _normalize_timestamps(data) + batch = data.get("batch", data) if isinstance(data, dict) else data + + lines = [] + if isinstance(batch, dict): + for key, val in batch.items(): + lines.append(f" {key:16s} {val}") + text = "\n".join(lines) if lines else "(no batch details)" + + output(args, data, text=text) + + +def _job_list(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + ctx = _resolve_project_context(args) + if ctx is None: + return + api_key, workspace_url, project_slug = ctx + + try: + data = rfapi.list_annotation_jobs(api_key, workspace_url, project_slug) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + jobs = data if isinstance(data, list) else data.get("jobs", data) + jobs = _normalize_timestamps(jobs) + + table = format_table( + jobs if isinstance(jobs, list) else [], + columns=["name", "id", "status", "assigned_to"], + headers=["NAME", "ID", "STATUS", "ASSIGNED_TO"], + ) + output(args, jobs, text=table) + + +def _job_get(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + ctx = _resolve_project_context(args) + if ctx is None: + return + api_key, workspace_url, project_slug = ctx + + try: + data = rfapi.get_annotation_job(api_key, workspace_url, project_slug, args.job_id) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + data = _normalize_timestamps(data) + job = data.get("job", data) if isinstance(data, dict) else data + + lines = [] + if isinstance(job, dict): + for key, val in job.items(): + lines.append(f" {key:16s} {val}") + text = "\n".join(lines) if lines else "(no job details)" + + output(args, data, text=text) + + +def _job_create(args: argparse.Namespace) -> None: + import roboflow + from roboflow.cli._output import output, output_error, suppress_sdk_output + + ctx = _resolve_project_context(args) + if ctx is None: + return + _api_key, workspace_url, project_slug = ctx + + with suppress_sdk_output(args): + try: + rf = roboflow.Roboflow(api_key=_api_key) + workspace = rf.workspace(workspace_url) + project = workspace.project(project_slug) + except Exception as exc: + output_error(args, str(exc)) + return + + try: + result = project.create_annotation_job( + name=args.name, + batch_id=args.batch, + num_images=args.num_images, + labeler_email=args.labeler, + reviewer_email=args.reviewer, + ) + except Exception as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created annotation job: {args.name}") diff --git a/roboflow/cli/handlers/deployment.py b/roboflow/cli/handlers/deployment.py index f85bd4ba..0fd1033d 100644 --- a/roboflow/cli/handlers/deployment.py +++ b/roboflow/cli/handlers/deployment.py @@ -32,7 +32,6 @@ def _wrapped(args: argparse.Namespace) -> None: except SystemExit as exc: sys.stdout = orig_stdout code = exc.code if isinstance(exc.code, int) else 1 - # Map legacy exit codes to CLI conventions: 1=general, 2=auth, 3=not-found exit_code = {0: 1, 1: 1, 2: 2, 3: 3}.get(code, 1) if code else 1 text = captured.getvalue().strip() if text: @@ -40,6 +39,15 @@ def _wrapped(args: argparse.Namespace) -> None: else: output_error(args, "Deployment command failed.", exit_code=1) return + except Exception as exc: + sys.stdout = orig_stdout + output_error( + args, + f"Deployment service unavailable: {type(exc).__name__}", + hint="The dedicated deployment service may be down or unreachable. Try again later.", + exit_code=1, + ) + return finally: sys.stdout = orig_stdout diff --git a/roboflow/cli/handlers/folder.py b/roboflow/cli/handlers/folder.py index c2dc7f3a..4334c9af 100644 --- a/roboflow/cli/handlers/folder.py +++ b/roboflow/cli/handlers/folder.py @@ -10,35 +10,171 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[type-arg] """Register the ``folder`` command group.""" - from roboflow.cli._output import stub - folder_parser = subparsers.add_parser("folder", help="Manage workspace folders") folder_subs = folder_parser.add_subparsers(title="folder commands", dest="folder_command") # --- folder list --- list_p = folder_subs.add_parser("list", help="List folders") - list_p.set_defaults(func=stub) + list_p.set_defaults(func=_list_folders) # --- folder get --- get_p = folder_subs.add_parser("get", help="Show folder details") get_p.add_argument("folder_id", help="Folder ID") - get_p.set_defaults(func=stub) + get_p.set_defaults(func=_get_folder) # --- folder create --- create_p = folder_subs.add_parser("create", help="Create a folder") create_p.add_argument("name", help="Folder name") - create_p.set_defaults(func=stub) + create_p.add_argument("--parent", dest="parent", default=None, help="Parent folder ID") + create_p.add_argument("--projects", dest="projects", default=None, help="Comma-separated project IDs") + create_p.set_defaults(func=_create_folder) # --- folder update --- update_p = folder_subs.add_parser("update", help="Update a folder") update_p.add_argument("folder_id", help="Folder ID") update_p.add_argument("--name", help="New folder name") - update_p.set_defaults(func=stub) + update_p.set_defaults(func=_update_folder) # --- folder delete --- delete_p = folder_subs.add_parser("delete", help="Delete a folder") delete_p.add_argument("folder_id", help="Folder ID") - delete_p.set_defaults(func=stub) + delete_p.set_defaults(func=_delete_folder) # Default folder_parser.set_defaults(func=lambda args: folder_parser.print_help()) + + +def _resolve_ws_and_key(args: argparse.Namespace): + """Resolve workspace and API key, returning (ws, api_key) or None on error.""" + from roboflow.cli._resolver import resolve_ws_and_key + + return resolve_ws_and_key(args) + + +def _list_folders(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.list_folders(api_key, ws) + except rfapi.RoboflowError as exc: + # The API returns 404 when there are no folders — treat as empty, not error + if "Not Found" in str(exc): + result = {"data": []} + else: + output_error(args, str(exc), exit_code=3) + return + except Exception as exc: + output_error(args, str(exc), exit_code=3) + return + + folders = result.get("data", result.get("groups", result if isinstance(result, list) else [])) + rows = [] + for f in folders: + projects = f.get("projects", []) + project_count = len(projects) if isinstance(projects, list) else projects + rows.append({"name": f.get("name", ""), "id": f.get("id", ""), "projects": str(project_count)}) + + table = format_table(rows, columns=["name", "id", "projects"], headers=["NAME", "ID", "PROJECTS"]) + output(args, folders, text=table) + + +def _get_folder(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.get_folder(api_key, ws, args.folder_id) + except Exception as exc: + output_error(args, str(exc), exit_code=3) + return + + # API returns {"data": [folder_obj]} — extract the first item + data_list = result.get("data", []) + folder = data_list[0] if isinstance(data_list, list) and data_list else result.get("group", result) + lines = [ + f"Folder: {folder.get('name', '')}", + f" ID: {folder.get('id', '')}", + ] + projects = folder.get("projects", []) + if isinstance(projects, list): + lines.append(f" Projects: {len(projects)}") + for p in projects: + if isinstance(p, dict): + lines.append(f" - {p.get('name', p.get('id', ''))}") + else: + lines.append(f" - {p}") + output(args, result, text="\n".join(lines)) + + +def _create_folder(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + project_ids = None + if args.projects: + project_ids = [p.strip() for p in args.projects.split(",")] + + try: + result = rfapi.create_folder(api_key, ws, args.name, parent_id=args.parent, project_ids=project_ids) + except Exception as exc: + output_error(args, str(exc), exit_code=1) + return + + folder_id = result.get("id", result.get("group", {}).get("id", "")) + data = {"status": "created", "id": folder_id} + output(args, data, text=f"Created folder '{args.name}' (id: {folder_id})") + + +def _update_folder(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + rfapi.update_folder(api_key, ws, args.folder_id, name=args.name) + except Exception as exc: + output_error(args, str(exc), exit_code=1) + return + + data = {"status": "updated"} + output(args, data, text=f"Updated folder '{args.folder_id}'") + + +def _delete_folder(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + rfapi.delete_folder(api_key, ws, args.folder_id) + except Exception as exc: + output_error(args, str(exc), exit_code=3) + return + + data = {"status": "deleted"} + output(args, data, text=f"Deleted folder '{args.folder_id}'") diff --git a/roboflow/cli/handlers/train.py b/roboflow/cli/handlers/train.py index e126fd3e..bc6cc525 100644 --- a/roboflow/cli/handlers/train.py +++ b/roboflow/cli/handlers/train.py @@ -92,6 +92,10 @@ def _start(args: argparse.Namespace) -> None: output_error(args, "No API key found.", hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.", exit_code=2) return + # Ensure the version has the required export format before training + if args.model_type: + _ensure_export(args, api_key, workspace_url, project_slug, str(args.version_number), args.model_type) + try: rfapi.start_version_training( api_key, @@ -104,7 +108,16 @@ def _start(args: argparse.Namespace) -> None: epochs=args.epochs, ) except rfapi.RoboflowError as exc: - output_error(args, str(exc)) + err_str = str(exc) + if "Unknown error" in err_str: + output_error( + args, + "Training failed. The server returned an unexpected error.", + hint="Ensure the version is fully generated and exported. " + "Run 'roboflow version export -p -f coco' first.", + ) + else: + output_error(args, err_str) return data = { @@ -113,3 +126,66 @@ def _start(args: argparse.Namespace) -> None: "version": args.version_number, } output(args, data, text=f"Training started for {project_slug} version {args.version_number}.") + + +def _ensure_export(args, api_key, workspace_url, project_slug, version_str, model_type): + """Check if the version has the required export format; trigger and poll if not.""" + import sys + import time + + from roboflow.adapters import rfapi + from roboflow.util.versions import get_model_format + + required_format = get_model_format(model_type) + + try: + version_data = rfapi.get_version(api_key, workspace_url, project_slug, version_str) + except rfapi.RoboflowError: + return # Can't check; let the train call handle errors + + version_info = version_data.get("version", {}) + + # Check if still generating + if version_info.get("generating"): + if not getattr(args, "quiet", False): + print(f"Version is still generating ({version_info.get('progress', 0):.0%})... waiting.", file=sys.stderr) + while True: + time.sleep(5) + try: + version_data = rfapi.get_version(api_key, workspace_url, project_slug, version_str, nocache=True) + version_info = version_data.get("version", {}) + if not version_info.get("generating"): + break + if not getattr(args, "quiet", False): + print( + f" Generating... {version_info.get('progress', 0):.0%}", + file=sys.stderr, + ) + except rfapi.RoboflowError: + break + + # Check if export exists + exports = version_info.get("exports", []) + if required_format not in exports: + if not getattr(args, "quiet", False): + print( + f"Exporting version in {required_format} format (required for {model_type})...", + file=sys.stderr, + ) + try: + rfapi.get_version_export(api_key, workspace_url, project_slug, version_str, required_format) + except rfapi.RoboflowError: + pass # Export may have been triggered; poll below + + # Poll until export is ready + for _ in range(120): # Up to 10 minutes + time.sleep(5) + try: + version_data = rfapi.get_version(api_key, workspace_url, project_slug, version_str, nocache=True) + current_exports = version_data.get("version", {}).get("exports", []) + if required_format in current_exports: + if not getattr(args, "quiet", False): + print(" Export complete.", file=sys.stderr) + return + except rfapi.RoboflowError: + pass diff --git a/roboflow/cli/handlers/universe.py b/roboflow/cli/handlers/universe.py index fea90c8d..a2a92ec5 100644 --- a/roboflow/cli/handlers/universe.py +++ b/roboflow/cli/handlers/universe.py @@ -10,8 +10,6 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[type-arg] """Register the ``universe`` command group.""" - from roboflow.cli._output import stub - uni_parser = subparsers.add_parser("universe", help="Browse Roboflow Universe") uni_subs = uni_parser.add_subparsers(title="universe commands", dest="universe_command") @@ -19,8 +17,41 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[ty search_p = uni_subs.add_parser("search", help="Search Roboflow Universe") search_p.add_argument("query", help="Search query") search_p.add_argument("--type", dest="type", choices=["dataset", "model"], default=None, help="Filter by type") - search_p.add_argument("--limit", type=int, default=20, help="Max results (default: 20)") - search_p.set_defaults(func=stub) + search_p.add_argument("--limit", type=int, default=12, help="Max results (default: 12)") + search_p.set_defaults(func=_search) # Default uni_parser.set_defaults(func=lambda args: uni_parser.print_help()) + + +def _search(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + from roboflow.config import load_roboflow_api_key + + api_key = args.api_key or load_roboflow_api_key(None) + + try: + data = rfapi.search_universe(args.query, api_key=api_key, project_type=args.type, limit=args.limit) + except rfapi.RoboflowError as exc: + output_error(args, str(exc)) + return + + results = data.get("results", []) + # The API may ignore the limit param; enforce it client-side + if args.limit and len(results) > args.limit: + results = results[: args.limit] + rows = [] + for r in results: + rows.append( + { + "name": r.get("name", r.get("id", "")), + "type": r.get("type", ""), + "images": r.get("images", 0), + "url": r.get("url", ""), + } + ) + + table = format_table(rows, columns=["name", "type", "images", "url"], headers=["NAME", "TYPE", "IMAGES", "URL"]) + output(args, results, text=table) diff --git a/roboflow/cli/handlers/version.py b/roboflow/cli/handlers/version.py index 205abf82..86a18d0a 100644 --- a/roboflow/cli/handlers/version.py +++ b/roboflow/cli/handlers/version.py @@ -2,11 +2,15 @@ from __future__ import annotations +import argparse import re -from typing import TYPE_CHECKING -if TYPE_CHECKING: - import argparse + +class _RawEpilogFormatter(argparse.HelpFormatter): + """Formatter that preserves raw text in the epilog while wrapping everything else.""" + + def _fill_text(self, text: str, width: int, indent: str) -> str: + return text def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[type-arg] @@ -39,10 +43,24 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[ty export_parser.add_argument("-f", "--format", dest="format", default="voc", help="Export format (default: voc)") export_parser.set_defaults(func=_export) - # --- version create (stub) --- - create_parser = version_subs.add_parser("create", help="Create a new version (coming soon)") + # --- version create --- + create_parser = version_subs.add_parser( + "create", + help="Create a new dataset version", + epilog=( + "Settings JSON example:\n" + ' {"augmentation": {"flip": {"horizontal": true, "vertical": false},\n' + ' "rotate": {"degrees": 15}, "brightness": {"percent": 25}},\n' + ' "preprocessing": {"auto-orient": true, "resize": {"width": 640,\n' + ' "height": 640, "format": "Stretch to"}}}\n\n' + "See https://docs.roboflow.com/datasets/create-a-dataset-version for all options." + ), + formatter_class=_RawEpilogFormatter, + ) create_parser.add_argument("-p", "--project", dest="project", required=True, help="Project ID") - create_parser.add_argument("--settings", dest="settings", default=None, help="Version settings as JSON string") + create_parser.add_argument( + "--settings", dest="settings", required=True, help="Path to JSON file with augmentation/preprocessing config" + ) create_parser.set_defaults(func=_create) # Default when no verb is given @@ -143,19 +161,28 @@ def _get_version(args: argparse.Namespace) -> None: def _parse_url(url: str) -> tuple: - """Parse a Roboflow URL or shorthand into (workspace, project, version).""" - regex = ( - r"(?:https?://)?(?:universe|app)\.roboflow\.(?:com|one)/([^/]+)/([^/]+)" - r"(?:/dataset)?(?:/(\d+))?" - r"|([^/]+)/([^/]+)(?:/(\d+))?" - ) - match = re.match(regex, url) + """Parse a Roboflow URL or shorthand into (workspace, project, version). + + Supports: + - Full URLs: https://universe.roboflow.com/ws/proj/3 + - Three segments: ws/proj/3 + - Two segments: ws/proj OR proj/3 (numeric = version, uses default ws) + - One segment: proj (uses default ws, no version) + """ + # Try full URL first + url_regex = r"(?:https?://)?(?:universe|app)\.roboflow\.(?:com|one)/([^/]+)/([^/]+)(?:/dataset)?(?:/(\d+))?" + match = re.match(url_regex, url) if match: - organization = match.group(1) or match.group(4) - dataset = match.group(2) or match.group(5) - version = match.group(3) or match.group(6) - return organization, dataset, version - return None, None, None + return match.group(1), match.group(2), match.group(3) + + # Non-URL shorthand: use resolve_resource for proper disambiguation + from roboflow.cli._resolver import resolve_resource + + try: + ws, proj, ver = resolve_resource(url, workspace_override=None) + return ws, proj, str(ver) if ver is not None else None + except ValueError: + return None, None, None def _download(args: argparse.Namespace) -> None: @@ -240,6 +267,45 @@ def _export(args: argparse.Namespace) -> None: def _create(args: argparse.Namespace) -> None: - from roboflow.cli._output import output_error + import json + + import roboflow + from roboflow.cli._output import output, output_error, suppress_sdk_output + from roboflow.cli._resolver import resolve_resource + from roboflow.config import load_roboflow_api_key + + try: + workspace_url, project_slug, _ver = resolve_resource(args.project, workspace_override=args.workspace) + except ValueError as exc: + output_error(args, str(exc)) + return + + api_key = args.api_key or load_roboflow_api_key(workspace_url) + if not api_key: + output_error(args, "No API key found.", hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.", exit_code=2) + return + + try: + with open(args.settings) as f: + settings = json.load(f) + except FileNotFoundError: + output_error(args, f"Settings file not found: {args.settings}") + return + except json.JSONDecodeError as exc: + output_error(args, f"Invalid JSON in settings file: {exc}") + return + + with suppress_sdk_output(): + try: + rf = roboflow.Roboflow(api_key) + project = rf.workspace(workspace_url).project(project_slug) + version_id = project.generate_version(settings) + except Exception as exc: + output_error(args, str(exc)) + return + + # generate_version returns the version number/ID directly + version_num = version_id if version_id else "unknown" - output_error(args, "This command is not yet implemented.", hint="Coming soon.", exit_code=1) + data = {"status": "created", "project": project_slug, "version": version_num} + output(args, data, text=f"Created version {version_num} for project {project_slug}") diff --git a/roboflow/cli/handlers/video.py b/roboflow/cli/handlers/video.py index dcb41918..906918e2 100644 --- a/roboflow/cli/handlers/video.py +++ b/roboflow/cli/handlers/video.py @@ -24,9 +24,7 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[ty # --- video status --- status_p = video_subs.add_parser("status", help="Check video inference job status") status_p.add_argument("job_id", help="Job ID to check") - from roboflow.cli._output import stub - - status_p.set_defaults(func=stub) + status_p.set_defaults(func=_video_status) # Default video_parser.set_defaults(func=lambda args: video_parser.print_help()) @@ -43,19 +41,58 @@ def _video_infer(args: argparse.Namespace) -> None: return try: - rf = roboflow.Roboflow(api_key) - project = rf.workspace().project(args.project) - version = project.version(args.version_number) - model = version.model - - job_id, _signed_url, _expire_time = model.predict_video( - args.video_file, - args.fps, - prediction_type="batch-video", - ) + from roboflow.cli._output import suppress_sdk_output + + with suppress_sdk_output(): + rf = roboflow.Roboflow(api_key) + project = rf.workspace().project(args.project) + version = project.version(args.version_number) + model = version.model + + job_id, _signed_url, _expire_time = model.predict_video( + args.video_file, + args.fps, + prediction_type="batch-video", + ) except Exception as exc: output_error(args, str(exc)) return data = {"job_id": job_id, "status": "submitted"} output(args, data, text=f"Video inference submitted. Job ID: {job_id}") + + +def _video_status(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.config import load_roboflow_api_key + + api_key = args.api_key or load_roboflow_api_key(None) + if not api_key: + output_error(args, "No API key found.", hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.", exit_code=2) + return + + try: + data = rfapi.get_video_job_status(api_key, args.job_id) + except rfapi.RoboflowError as exc: + msg = str(exc) + if "NOT FOUND" in msg.upper(): + output_error( + args, + f"Video job '{args.job_id}' not found.", + hint="Check the job ID. You can get job IDs from 'roboflow video infer'.", + exit_code=3, + ) + else: + output_error(args, msg, exit_code=3) + return + + status = data.get("status", "unknown") + progress = data.get("progress", "") + text_lines = [ + f"Job ID: {args.job_id}", + f"Status: {status}", + ] + if progress: + text_lines.append(f"Progress: {progress}") + output(args, data, text="\n".join(text_lines)) diff --git a/roboflow/cli/handlers/workflow.py b/roboflow/cli/handlers/workflow.py index 76db80df..7b437cac 100644 --- a/roboflow/cli/handlers/workflow.py +++ b/roboflow/cli/handlers/workflow.py @@ -10,61 +10,355 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[type-arg] """Register the ``workflow`` command group.""" - from roboflow.cli._output import stub - wf_parser = subparsers.add_parser("workflow", help="Manage workflows") wf_subs = wf_parser.add_subparsers(title="workflow commands", dest="workflow_command") # --- workflow list --- list_p = wf_subs.add_parser("list", help="List workflows in a workspace") - list_p.set_defaults(func=stub) + list_p.set_defaults(func=_list_workflows) # --- workflow get --- get_p = wf_subs.add_parser("get", help="Show details for a workflow") get_p.add_argument("workflow_url", help="Workflow URL or ID") - get_p.set_defaults(func=stub) + get_p.set_defaults(func=_get_workflow) # --- workflow create --- create_p = wf_subs.add_parser("create", help="Create a new workflow") create_p.add_argument("--name", required=True, help="Workflow name") create_p.add_argument("--definition", help="Path to JSON definition file") create_p.add_argument("--description", default=None, help="Workflow description") - create_p.set_defaults(func=stub) + create_p.set_defaults(func=_create_workflow) # --- workflow update --- update_p = wf_subs.add_parser("update", help="Update an existing workflow") update_p.add_argument("workflow_url", help="Workflow URL or ID") update_p.add_argument("--definition", help="Path to JSON definition file") - update_p.set_defaults(func=stub) + update_p.set_defaults(func=_update_workflow) # --- workflow version --- version_p = wf_subs.add_parser("version", help="Manage workflow versions") version_subs = version_p.add_subparsers(title="workflow version commands", dest="workflow_version_command") version_list_p = version_subs.add_parser("list", help="List versions of a workflow") version_list_p.add_argument("workflow_url", help="Workflow URL or ID") - version_list_p.set_defaults(func=stub) + version_list_p.set_defaults(func=_list_workflow_versions) version_p.set_defaults(func=lambda args: version_p.print_help()) # --- workflow fork --- fork_p = wf_subs.add_parser("fork", help="Fork a workflow") fork_p.add_argument("workflow_url", help="Workflow URL or ID") - fork_p.set_defaults(func=stub) + fork_p.set_defaults(func=_fork_workflow) - # --- workflow build --- + # --- workflow build (stub) --- build_p = wf_subs.add_parser("build", help="Build a workflow from a prompt") build_p.add_argument("prompt", help="Natural language prompt describing the workflow") - build_p.set_defaults(func=stub) + build_p.set_defaults(func=_stub_build) - # --- workflow run --- + # --- workflow run (stub) --- run_p = wf_subs.add_parser("run", help="Run a workflow") run_p.add_argument("workflow_url", help="Workflow URL or ID") run_p.add_argument("--input", dest="input", help="Input file or URL") - run_p.set_defaults(func=stub) + run_p.set_defaults(func=_stub_run) - # --- workflow deploy --- + # --- workflow deploy (stub) --- deploy_p = wf_subs.add_parser("deploy", help="Deploy a workflow") deploy_p.add_argument("workflow_url", help="Workflow URL or ID") - deploy_p.set_defaults(func=stub) + deploy_p.set_defaults(func=_stub_deploy) # Default wf_parser.set_defaults(func=lambda args: wf_parser.print_help()) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _resolve_workspace_and_key(args: argparse.Namespace): + """Return (workspace_url, api_key) or call output_error and return None.""" + from roboflow.cli._resolver import resolve_ws_and_key + + return resolve_ws_and_key(args) + + +def _read_definition_file(args: argparse.Namespace): + """Read and parse a JSON definition file. Returns the parsed dict, or None if no file given. + + Calls output_error and returns False on failure. + """ + import json + import os + + from roboflow.cli._output import output_error + + if not args.definition: + return None + + if not os.path.isfile(args.definition): + output_error(args, f"File not found: {args.definition}", hint="Provide a valid JSON file path.") + return False + + with open(args.definition) as f: + try: + return json.load(f) + except json.JSONDecodeError as exc: + output_error(args, f"Invalid JSON in {args.definition}: {exc}") + return False + + +# --------------------------------------------------------------------------- +# Implemented commands +# --------------------------------------------------------------------------- + + +def _list_workflows(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + resolved = _resolve_workspace_and_key(args) + if resolved is None: + return + workspace_url, api_key = resolved + + try: + data = rfapi.list_workflows(api_key, workspace_url) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + workflows = data if isinstance(data, list) else data.get("workflows", []) + + table = format_table( + workflows, + columns=["name", "url", "status"], + headers=["NAME", "URL", "STATUS"], + ) + output(args, workflows, text=table) + + +def _get_workflow(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_workspace_and_key(args) + if resolved is None: + return + workspace_url, api_key = resolved + + try: + data = rfapi.get_workflow(api_key, workspace_url, args.workflow_url) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + workflow = data.get("workflow", data) if isinstance(data, dict) else data + + lines = [] + if isinstance(workflow, dict): + field_map = [ + ("Name", "name"), + ("URL", "url"), + ("Description", "description"), + ("Blocks", "blockCount"), + ] + for label, key in field_map: + if key in workflow: + lines.append(f" {label:14s} {workflow[key]}") + text = "\n".join(lines) if lines else "(no workflow details)" + + output(args, data, text=text) + + +def _create_workflow(args: argparse.Namespace) -> None: + import json as _json + + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_workspace_and_key(args) + if resolved is None: + return + workspace_url, api_key = resolved + + definition = _read_definition_file(args) + if definition is False: + return + + # The API expects config/template as JSON strings. + config = _json.dumps(definition) if definition is not None else "{}" + template = "{}" + + try: + data = rfapi.create_workflow( + api_key, + workspace_url, + name=args.name, + config=config, + template=template, + ) + except rfapi.RoboflowError as exc: + output_error(args, str(exc)) + return + + text = f"Created workflow: {args.name}" + output(args, data, text=text) + + +def _update_workflow(args: argparse.Namespace) -> None: + import json as _json + + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_workspace_and_key(args) + if resolved is None: + return + workspace_url, api_key = resolved + + definition = _read_definition_file(args) + if definition is False: + return + + # Fetch the existing workflow to get required id/name/url fields. + try: + existing = rfapi.get_workflow(api_key, workspace_url, args.workflow_url) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + wf = existing.get("workflow", existing) if isinstance(existing, dict) else existing + if not isinstance(wf, dict): + output_error(args, "Unexpected response from API when fetching workflow.") + return + + workflow_id = wf.get("id", "") + workflow_name = wf.get("name", "") + workflow_url_slug = wf.get("url", args.workflow_url) + + # Merge: use new definition as config if provided, otherwise keep existing. + if definition is not None: + config = _json.dumps(definition) if not isinstance(definition, str) else definition + else: + config = wf.get("config", "{}") + if not isinstance(config, str): + config = _json.dumps(config) + + try: + data = rfapi.update_workflow( + api_key, + workspace_url, + workflow_id=workflow_id, + workflow_name=workflow_name, + workflow_url=workflow_url_slug, + config=config, + ) + except rfapi.RoboflowError as exc: + output_error(args, str(exc)) + return + + text = f"Updated workflow: {args.workflow_url}" + output(args, data, text=text) + + +def _list_workflow_versions(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + from roboflow.cli._table import format_table + + resolved = _resolve_workspace_and_key(args) + if resolved is None: + return + workspace_url, api_key = resolved + + try: + data = rfapi.list_workflow_versions(api_key, workspace_url, args.workflow_url) + except rfapi.RoboflowError as exc: + output_error(args, str(exc), exit_code=3) + return + + versions = data if isinstance(data, list) else data.get("versions", []) + + table = format_table( + versions, + columns=["version", "created"], + headers=["VERSION", "CREATED"], + ) + output(args, versions, text=table) + + +def _fork_workflow(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_workspace_and_key(args) + if resolved is None: + return + workspace_url, api_key = resolved + + # Parse workflow_url: could be "workflow-slug" or "source-ws/workflow-slug". + parts = args.workflow_url.strip("/").split("/") + if len(parts) == 2: + source_workspace = parts[0] + source_workflow = parts[1] + else: + # Default: source workspace is the current workspace. + source_workspace = workspace_url + source_workflow = parts[0] + + try: + data = rfapi.fork_workflow( + api_key, + workspace_url, + source_workspace=source_workspace, + source_workflow=source_workflow, + ) + except rfapi.RoboflowError as exc: + output_error(args, str(exc)) + return + + # Extract the forked workflow URL from potentially nested response + new_url = "" + if isinstance(data, dict): + wf = data.get("workflow", data) + if isinstance(wf, dict): + new_url = str(wf.get("url", wf.get("workflow_url", ""))) + else: + new_url = str(wf) if wf else "" + result = {"status": "forked", "source": args.workflow_url, "new_url": new_url} + text = f"Forked workflow: {args.workflow_url} -> {new_url}" + output(args, result, text=text) + + +# --------------------------------------------------------------------------- +# Stubs +# --------------------------------------------------------------------------- + + +def _stub_build(args: argparse.Namespace) -> None: + from roboflow.cli._output import output_error + + output_error( + args, + "This command is not yet implemented.", + hint="Requires Roboflow Agent API. Coming in a future release.", + ) + + +def _stub_run(args: argparse.Namespace) -> None: + from roboflow.cli._output import output_error + + output_error( + args, + "This command is not yet implemented.", + hint="Requires inference_sdk integration. Coming in a future release.", + ) + + +def _stub_deploy(args: argparse.Namespace) -> None: + from roboflow.cli._output import output_error + + output_error( + args, + "This command is not yet implemented.", + hint="Coming in a future release.", + ) diff --git a/roboflow/cli/handlers/workspace.py b/roboflow/cli/handlers/workspace.py index bff3758c..94737d39 100644 --- a/roboflow/cli/handlers/workspace.py +++ b/roboflow/cli/handlers/workspace.py @@ -1,4 +1,4 @@ -"""Workspace commands: list, get.""" +"""Workspace commands: list, get, usage, plan, stats.""" from __future__ import annotations @@ -22,6 +22,20 @@ def register(subparsers: argparse._SubParsersAction) -> None: # type: ignore[ty get_p.add_argument("workspace_id", help="Workspace URL or ID") get_p.set_defaults(func=_get_workspace) + # --- workspace usage --- + usage_p = ws_sub.add_parser("usage", help="Show billing usage report") + usage_p.set_defaults(func=_workspace_usage) + + # --- workspace plan --- + plan_p = ws_sub.add_parser("plan", help="Show workspace plan info and limits") + plan_p.set_defaults(func=_workspace_plan) + + # --- workspace stats --- + stats_p = ws_sub.add_parser("stats", help="Show annotation/labeling statistics") + stats_p.add_argument("--start-date", dest="start_date", required=True, help="Start date (YYYY-MM-DD)") + stats_p.add_argument("--end-date", dest="end_date", required=True, help="End date (YYYY-MM-DD)") + stats_p.set_defaults(func=_workspace_stats) + # Default: show help ws_parser.set_defaults(func=lambda args: ws_parser.print_help()) @@ -115,3 +129,85 @@ def _get_workspace(args: argparse.Namespace) -> None: f" Projects: {project_count}", ] output(args, workspace_json, text="\n".join(lines)) + + +def _resolve_ws_and_key(args: argparse.Namespace): + """Resolve workspace and API key for workspace subcommands.""" + from roboflow.cli._resolver import resolve_ws_and_key + + return resolve_ws_and_key(args) + + +def _workspace_usage(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.get_billing_usage(api_key, ws) + except Exception as exc: + output_error(args, str(exc), exit_code=3) + return + + usage = result.get("usage", result) + lines = ["Billing Usage:"] + if isinstance(usage, dict): + for key, val in usage.items(): + lines.append(f" {key}: {val}") + else: + lines.append(f" {usage}") + output(args, result, text="\n".join(lines)) + + +def _workspace_plan(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + _ws, api_key = resolved + + try: + result = rfapi.get_plan_info(api_key) + except Exception as exc: + output_error(args, str(exc), exit_code=3) + return + + plan = result.get("plan", result) + lines = ["Plan Info:"] + if isinstance(plan, dict): + for key, val in plan.items(): + lines.append(f" {key}: {val}") + else: + lines.append(f" {plan}") + output(args, result, text="\n".join(lines)) + + +def _workspace_stats(args: argparse.Namespace) -> None: + from roboflow.adapters import rfapi + from roboflow.cli._output import output, output_error + + resolved = _resolve_ws_and_key(args) + if not resolved: + return + ws, api_key = resolved + + try: + result = rfapi.get_labeling_stats(api_key, ws, start_date=args.start_date, end_date=args.end_date) + except Exception as exc: + output_error(args, str(exc), exit_code=3) + return + + stats = result.get("stats", result) + lines = ["Labeling Stats:"] + if isinstance(stats, dict): + for key, val in stats.items(): + lines.append(f" {key}: {val}") + else: + lines.append(f" {stats}") + output(args, result, text="\n".join(lines)) diff --git a/roboflow/core/project.py b/roboflow/core/project.py index e34c4ade..8a287fc4 100644 --- a/roboflow/core/project.py +++ b/roboflow/core/project.py @@ -872,6 +872,29 @@ def image(self, image_id: str) -> Dict: return image_details + def get_annotation_jobs(self) -> Dict: + """Get a list of all annotation jobs in the project. + + Returns: + Dict: A dictionary containing the list of annotation jobs. + """ + from roboflow.adapters import rfapi + + return rfapi.list_annotation_jobs(self.__api_key, self.__workspace, self.__project_name) + + def get_annotation_job(self, job_id: str) -> Dict: + """Get information for a specific annotation job. + + Args: + job_id: The ID of the annotation job to retrieve. + + Returns: + Dict: A dictionary containing the job details. + """ + from roboflow.adapters import rfapi + + return rfapi.get_annotation_job(self.__api_key, self.__workspace, self.__project_name, job_id) + def create_annotation_job( self, name: str, batch_id: str, num_images: int, labeler_email: str, reviewer_email: str ) -> Dict: diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index f248cf40..698aa59d 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -881,6 +881,63 @@ def search_export( print(f"Search export saved to {zip_path}") return zip_path + # ----------------------------------------------------------------- + # Phase 2: Folder management + # ----------------------------------------------------------------- + + def list_folders(self): + """List project folders in this workspace.""" + from roboflow.adapters import rfapi + + return rfapi.list_folders(self.__api_key, self.url) + + def create_folder(self, name, parent_id=None, project_ids=None): + """Create a project folder in this workspace.""" + from roboflow.adapters import rfapi + + return rfapi.create_folder(self.__api_key, self.url, name, parent_id=parent_id, project_ids=project_ids) + + # ----------------------------------------------------------------- + # Phase 2: Workflow management + # ----------------------------------------------------------------- + + def list_workflows(self): + """List workflows in this workspace.""" + from roboflow.adapters import rfapi + + return rfapi.list_workflows(self.__api_key, self.url) + + def get_workflow(self, workflow_url): + """Get workflow details.""" + from roboflow.adapters import rfapi + + return rfapi.get_workflow(self.__api_key, self.url, workflow_url) + + def create_workflow(self, name, definition=None): + """Create a new workflow.""" + import json + + from roboflow.adapters import rfapi + + config = json.dumps(definition) if definition else None + return rfapi.create_workflow(self.__api_key, self.url, name=name, config=config) + + # ----------------------------------------------------------------- + # Phase 2: Workspace statistics + # ----------------------------------------------------------------- + + def get_usage(self): + """Get billing usage report for this workspace.""" + from roboflow.adapters import rfapi + + return rfapi.get_billing_usage(self.__api_key, self.url) + + def get_plan(self): + """Get workspace plan info and limits.""" + from roboflow.adapters import rfapi + + return rfapi.get_plan_info(self.__api_key) + def __str__(self): projects = self.projects() json_value = {"name": self.name, "url": self.url, "projects": projects} diff --git a/tests/adapters/__init__.py b/tests/adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/adapters/test_rfapi_phase2.py b/tests/adapters/test_rfapi_phase2.py new file mode 100644 index 00000000..3537e7fe --- /dev/null +++ b/tests/adapters/test_rfapi_phase2.py @@ -0,0 +1,544 @@ +"""Unit tests for Phase 2 rfapi functions.""" + +import unittest +from unittest.mock import MagicMock, patch + + +class TestListBatches(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import list_batches + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"batches": [{"id": "b1"}]}) + result = list_batches("key", "ws", "proj") + self.assertEqual(result, {"batches": [{"id": "b1"}]}) + mock_get.assert_called_once() + self.assertIn("/ws/proj/batches", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, list_batches + + mock_get.return_value = MagicMock(status_code=404, text="Not found") + with self.assertRaises(RoboflowError): + list_batches("key", "ws", "proj") + + +class TestGetBatch(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_batch + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"batch": {"id": "b1"}}) + result = get_batch("key", "ws", "proj", "b1") + self.assertEqual(result, {"batch": {"id": "b1"}}) + self.assertIn("/ws/proj/batches/b1", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_batch + + mock_get.return_value = MagicMock(status_code=500, text="Server error") + with self.assertRaises(RoboflowError): + get_batch("key", "ws", "proj", "b1") + + +class TestListAnnotationJobs(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import list_annotation_jobs + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"jobs": []}) + result = list_annotation_jobs("key", "ws", "proj") + self.assertEqual(result, {"jobs": []}) + self.assertIn("/ws/proj/jobs", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, list_annotation_jobs + + mock_get.return_value = MagicMock(status_code=403, text="Forbidden") + with self.assertRaises(RoboflowError): + list_annotation_jobs("key", "ws", "proj") + + +class TestGetAnnotationJob(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_annotation_job + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"job": {"id": "j1", "name": "job1"}}) + result = get_annotation_job("key", "ws", "proj", "j1") + self.assertEqual(result["job"]["id"], "j1") + self.assertIn("/ws/proj/jobs/j1", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_annotation_job + + mock_get.return_value = MagicMock(status_code=404, text="Not found") + with self.assertRaises(RoboflowError): + get_annotation_job("key", "ws", "proj", "j1") + + +class TestCreateAnnotationJob(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import create_annotation_job + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"job": {"id": "j2"}}) + result = create_annotation_job("key", "ws", "proj", name="my-job", batch_id="b1") + self.assertEqual(result["job"]["id"], "j2") + # Verify URL and payload + call_args = mock_post.call_args + self.assertIn("/ws/proj/jobs", call_args[0][0]) + payload = call_args[1]["json"] + self.assertEqual(payload["name"], "my-job") + self.assertEqual(payload["batchId"], "b1") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_success_200(self, mock_post): + from roboflow.adapters.rfapi import create_annotation_job + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"job": {"id": "j3"}}) + result = create_annotation_job("key", "ws", "proj", name="my-job") + self.assertEqual(result["job"]["id"], "j3") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_with_assignees(self, mock_post): + from roboflow.adapters.rfapi import create_annotation_job + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"job": {"id": "j4"}}) + create_annotation_job("key", "ws", "proj", name="j", assignees=["a@b.com"]) + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["assignees"], ["a@b.com"]) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, create_annotation_job + + mock_post.return_value = MagicMock(status_code=400, text="Bad request") + with self.assertRaises(RoboflowError): + create_annotation_job("key", "ws", "proj", name="j") + + +class TestListFolders(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import list_folders + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"groups": []}) + result = list_folders("key", "ws") + self.assertEqual(result, {"groups": []}) + mock_get.assert_called_once() + self.assertIn("/ws/groups", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, list_folders + + mock_get.return_value = MagicMock(status_code=404, text="Not found") + with self.assertRaises(RoboflowError): + list_folders("key", "ws") + + +class TestGetFolder(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_folder + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"group": {"id": "g1", "name": "Folder1"}}) + result = get_folder("key", "ws", "g1") + self.assertEqual(result["group"]["id"], "g1") + call_kwargs = mock_get.call_args[1] + self.assertEqual(call_kwargs["params"]["groupId"], "g1") + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_folder + + mock_get.return_value = MagicMock(status_code=404, text="Not found") + with self.assertRaises(RoboflowError): + get_folder("key", "ws", "g1") + + +class TestCreateFolder(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import create_folder + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"group": {"id": "g2"}}) + result = create_folder("key", "ws", "NewFolder") + self.assertEqual(result["group"]["id"], "g2") + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["name"], "NewFolder") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_with_parent_and_projects(self, mock_post): + from roboflow.adapters.rfapi import create_folder + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"group": {"id": "g3"}}) + create_folder("key", "ws", "Sub", parent_id="g1", project_ids=["p1", "p2"]) + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["parent_id"], "g1") + self.assertEqual(payload["projects"], ["p1", "p2"]) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, create_folder + + mock_post.return_value = MagicMock(status_code=400, text="Bad request") + with self.assertRaises(RoboflowError): + create_folder("key", "ws", "BadFolder") + + +class TestUpdateFolder(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import update_folder + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"status": "ok"}) + result = update_folder("key", "ws", "g1", name="Renamed") + self.assertEqual(result["status"], "ok") + self.assertIn("/ws/groups/g1", mock_post.call_args[0][0]) + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["name"], "Renamed") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, update_folder + + mock_post.return_value = MagicMock(status_code=500, text="Server error") + with self.assertRaises(RoboflowError): + update_folder("key", "ws", "g1", name="X") + + +class TestDeleteFolder(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.delete") + def test_success(self, mock_delete): + from roboflow.adapters.rfapi import delete_folder + + mock_delete.return_value = MagicMock(status_code=200, json=lambda: {"status": "deleted"}) + result = delete_folder("key", "ws", "g1") + self.assertEqual(result["status"], "deleted") + self.assertIn("/ws/groups/g1", mock_delete.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.delete") + def test_error(self, mock_delete): + from roboflow.adapters.rfapi import RoboflowError, delete_folder + + mock_delete.return_value = MagicMock(status_code=403, text="Forbidden") + with self.assertRaises(RoboflowError): + delete_folder("key", "ws", "g1") + + +class TestListWorkflows(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import list_workflows + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"workflows": [{"name": "wf1"}]}) + result = list_workflows("key", "ws") + self.assertEqual(len(result["workflows"]), 1) + self.assertIn("/ws/workflows", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, list_workflows + + mock_get.return_value = MagicMock(status_code=500, text="Error") + with self.assertRaises(RoboflowError): + list_workflows("key", "ws") + + +class TestGetWorkflow(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_workflow + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"workflow": {"url": "wf1"}}) + result = get_workflow("key", "ws", "wf1") + self.assertEqual(result["workflow"]["url"], "wf1") + self.assertIn("/ws/workflows/wf1", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_workflow + + mock_get.return_value = MagicMock(status_code=404, text="Not found") + with self.assertRaises(RoboflowError): + get_workflow("key", "ws", "wf1") + + +class TestCreateWorkflow(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import create_workflow + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"workflow": {"url": "new-wf"}}) + result = create_workflow("key", "ws", name="New Workflow") + self.assertEqual(result["workflow"]["url"], "new-wf") + self.assertIn("/ws/createWorkflow", mock_post.call_args[0][0]) + # Params are passed as query-string params, not JSON body + params = mock_post.call_args[1]["params"] + self.assertEqual(params["name"], "New Workflow") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_auto_generates_url_slug(self, mock_post): + from roboflow.adapters.rfapi import create_workflow + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"workflow": {"url": "my-workflow"}}) + create_workflow("key", "ws", name="My Workflow") + params = mock_post.call_args[1]["params"] + self.assertEqual(params["url"], "my-workflow") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_with_config_and_template(self, mock_post): + from roboflow.adapters.rfapi import create_workflow + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"workflow": {"url": "wf2"}}) + create_workflow("key", "ws", name="WF2", url="wf2", config='{"a":1}', template='{"b":2}') + params = mock_post.call_args[1]["params"] + self.assertEqual(params["url"], "wf2") + self.assertEqual(params["config"], '{"a":1}') + self.assertEqual(params["template"], '{"b":2}') + + @patch("roboflow.adapters.rfapi.requests.post") + def test_config_dict_serialized_to_string(self, mock_post): + from roboflow.adapters.rfapi import create_workflow + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"workflow": {"url": "wf3"}}) + create_workflow("key", "ws", name="WF3", config={"a": 1}, template={"b": 2}) + params = mock_post.call_args[1]["params"] + # config and template must be strings per the API + self.assertIsInstance(params["config"], str) + self.assertIsInstance(params["template"], str) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_defaults_config_and_template(self, mock_post): + from roboflow.adapters.rfapi import create_workflow + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"workflow": {"url": "wf4"}}) + create_workflow("key", "ws", name="WF4") + params = mock_post.call_args[1]["params"] + self.assertEqual(params["config"], "{}") + self.assertEqual(params["template"], "{}") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, create_workflow + + mock_post.return_value = MagicMock(status_code=400, text="Bad request") + with self.assertRaises(RoboflowError): + create_workflow("key", "ws", name="Bad") + + +class TestUpdateWorkflow(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import update_workflow + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"status": "ok"}) + result = update_workflow( + "key", "ws", workflow_id="id-1", workflow_name="WF1", workflow_url="wf1", config={"steps": [1]} + ) + self.assertEqual(result["status"], "ok") + self.assertIn("/ws/updateWorkflow", mock_post.call_args[0][0]) + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["id"], "id-1") + self.assertEqual(payload["name"], "WF1") + self.assertEqual(payload["url"], "wf1") + # config dict should be serialized to string + self.assertIsInstance(payload["config"], str) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_config_string_passthrough(self, mock_post): + from roboflow.adapters.rfapi import update_workflow + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"status": "ok"}) + update_workflow("key", "ws", workflow_id="id-1", workflow_name="WF1", workflow_url="wf1", config='{"a":1}') + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["config"], '{"a":1}') + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, update_workflow + + mock_post.return_value = MagicMock(status_code=500, text="Server error") + with self.assertRaises(RoboflowError): + update_workflow("key", "ws", workflow_id="id-1", workflow_name="WF1", workflow_url="wf1", config="{}") + + +class TestListWorkflowVersions(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import list_workflow_versions + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"versions": [{"id": "v1"}]}) + result = list_workflow_versions("key", "ws", "wf1") + self.assertEqual(len(result["versions"]), 1) + self.assertIn("/ws/workflows/wf1/versions", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, list_workflow_versions + + mock_get.return_value = MagicMock(status_code=500, text="Error") + with self.assertRaises(RoboflowError): + list_workflow_versions("key", "ws", "wf1") + + +class TestForkWorkflow(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import fork_workflow + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"workflow": {"url": "forked"}}) + result = fork_workflow("key", "target-ws", source_workspace="src-ws", source_workflow="wf1") + self.assertEqual(result["workflow"]["url"], "forked") + self.assertIn("/target-ws/forkWorkflow", mock_post.call_args[0][0]) + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["source_workspace"], "src-ws") + self.assertEqual(payload["source_workflow"], "wf1") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_success_200(self, mock_post): + from roboflow.adapters.rfapi import fork_workflow + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"workflow": {"url": "forked2"}}) + result = fork_workflow("key", "ws", source_workspace="src-ws", source_workflow="wf2") + self.assertEqual(result["workflow"]["url"], "forked2") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_with_name_and_url(self, mock_post): + from roboflow.adapters.rfapi import fork_workflow + + mock_post.return_value = MagicMock(status_code=201, json=lambda: {"workflow": {"url": "custom-fork"}}) + fork_workflow( + "key", "ws", source_workspace="src-ws", source_workflow="wf1", name="Custom Fork", url="custom-fork" + ) + payload = mock_post.call_args[1]["json"] + self.assertEqual(payload["name"], "Custom Fork") + self.assertEqual(payload["url"], "custom-fork") + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, fork_workflow + + mock_post.return_value = MagicMock(status_code=403, text="Forbidden") + with self.assertRaises(RoboflowError): + fork_workflow("key", "ws", source_workspace="src-ws", source_workflow="wf1") + + +class TestGetBillingUsage(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.post") + def test_success(self, mock_post): + from roboflow.adapters.rfapi import get_billing_usage + + mock_post.return_value = MagicMock(status_code=200, json=lambda: {"usage": {"credits": 100}}) + result = get_billing_usage("key", "ws") + self.assertEqual(result["usage"]["credits"], 100) + self.assertIn("/ws/billing-usage-report", mock_post.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.post") + def test_error(self, mock_post): + from roboflow.adapters.rfapi import RoboflowError, get_billing_usage + + mock_post.return_value = MagicMock(status_code=403, text="Forbidden") + with self.assertRaises(RoboflowError): + get_billing_usage("key", "ws") + + +class TestGetPlanInfo(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_plan_info + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"plan": "starter", "limit": 1000}) + result = get_plan_info("key") + self.assertEqual(result["plan"], "starter") + self.assertIn("/usage/plan", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_plan_info + + mock_get.return_value = MagicMock(status_code=401, text="Unauthorized") + with self.assertRaises(RoboflowError): + get_plan_info("key") + + +class TestGetLabelingStats(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_labeling_stats + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"stats": {"labeled": 50}}) + result = get_labeling_stats("key", "ws") + self.assertEqual(result["stats"]["labeled"], 50) + self.assertIn("/ws/stats", mock_get.call_args[0][0]) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_labeling_stats + + mock_get.return_value = MagicMock(status_code=500, text="Error") + with self.assertRaises(RoboflowError): + get_labeling_stats("key", "ws") + + +class TestGetVideoJobStatus(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import get_video_job_status + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"status": "completed", "progress": 1.0}) + result = get_video_job_status("key", "job-123") + self.assertEqual(result["status"], "completed") + call_kwargs = mock_get.call_args[1] + self.assertEqual(call_kwargs["params"]["job_id"], "job-123") + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, get_video_job_status + + mock_get.return_value = MagicMock(status_code=404, text="Not found") + with self.assertRaises(RoboflowError): + get_video_job_status("key", "job-123") + + +class TestSearchUniverse(unittest.TestCase): + @patch("roboflow.adapters.rfapi.requests.get") + def test_success(self, mock_get): + from roboflow.adapters.rfapi import search_universe + + mock_get.return_value = MagicMock( + status_code=200, json=lambda: {"results": [{"name": "cats-dataset"}], "total": 1} + ) + result = search_universe("cats") + self.assertEqual(result["total"], 1) + call_kwargs = mock_get.call_args[1] + self.assertEqual(call_kwargs["params"]["q"], "cats") + + @patch("roboflow.adapters.rfapi.requests.get") + def test_with_type_and_limit(self, mock_get): + from roboflow.adapters.rfapi import search_universe + + mock_get.return_value = MagicMock(status_code=200, json=lambda: {"results": [], "total": 0}) + search_universe("dogs", project_type="model", limit=5, page=2) + call_kwargs = mock_get.call_args[1] + self.assertEqual(call_kwargs["params"]["type"], "model") + self.assertEqual(call_kwargs["params"]["limit"], 5) + self.assertEqual(call_kwargs["params"]["page"], 2) + + @patch("roboflow.adapters.rfapi.requests.get") + def test_error(self, mock_get): + from roboflow.adapters.rfapi import RoboflowError, search_universe + + mock_get.return_value = MagicMock(status_code=500, text="Server error") + with self.assertRaises(RoboflowError): + search_universe("query") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/cli/test_annotation_handler.py b/tests/cli/test_annotation_handler.py index bcd9aa4b..5561cd92 100644 --- a/tests/cli/test_annotation_handler.py +++ b/tests/cli/test_annotation_handler.py @@ -2,9 +2,11 @@ import argparse import io +import json import sys import types import unittest +from unittest.mock import MagicMock, patch def _build_annotation_parser(): @@ -60,13 +62,19 @@ def test_annotation_job_create(self): "my-job", "--batch", "batch-1", - "--assignees", - "a@b.com,c@d.com", + "--num-images", + "10", + "--labeler", + "a@b.com", + "--reviewer", + "c@d.com", ] ) self.assertEqual(args.name, "my-job") self.assertEqual(args.batch, "batch-1") - self.assertEqual(args.assignees, "a@b.com,c@d.com") + self.assertEqual(args.num_images, 10) + self.assertEqual(args.labeler, "a@b.com") + self.assertEqual(args.reviewer, "c@d.com") class TestAnnotationStub(unittest.TestCase): @@ -90,8 +98,6 @@ def test_stub_prints_message(self): self.assertIn("not yet implemented", buf.getvalue()) def test_stub_json_mode(self): - import json - from roboflow.cli._output import stub as _stub args = types.SimpleNamespace(json=True) @@ -110,5 +116,224 @@ def test_stub_json_mode(self): self.assertIn("not yet implemented", result["error"]["message"]) +# --------------------------------------------------------------------------- +# Behavior tests (mocked API) +# --------------------------------------------------------------------------- + +_RESOLVE = "roboflow.cli.handlers.annotation._resolve_project_context" + + +class TestBatchList(unittest.TestCase): + """annotation batch list""" + + @patch("roboflow.adapters.rfapi.list_batches") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_text_output(self, _resolve, mock_api): + mock_api.return_value = {"batches": [{"name": "b1", "id": "1", "status": "annotating", "images": 5}]} + parser = _build_annotation_parser() + args = parser.parse_args(["annotation", "batch", "list", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + self.assertIn("b1", buf.getvalue()) + + @patch("roboflow.adapters.rfapi.list_batches") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_json_output(self, _resolve, mock_api): + mock_api.return_value = {"batches": [{"name": "b1", "id": "1"}]} + parser = _build_annotation_parser() + args = parser.parse_args(["--json", "annotation", "batch", "list", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + data = json.loads(buf.getvalue()) + self.assertIsInstance(data, list) + self.assertEqual(data[0]["name"], "b1") + + @patch(_RESOLVE, return_value=None) + def test_resolve_failure(self, _resolve): + parser = _build_annotation_parser() + args = parser.parse_args(["annotation", "batch", "list", "-p", "bad"]) + # Should return without crashing when resolve returns None + args.func(args) + + +class TestBatchGet(unittest.TestCase): + """annotation batch get""" + + @patch("roboflow.adapters.rfapi.get_batch") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_text_output(self, _resolve, mock_api): + mock_api.return_value = {"batch": {"name": "b1", "id": "1", "status": "annotating"}} + parser = _build_annotation_parser() + args = parser.parse_args(["annotation", "batch", "get", "1", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + self.assertIn("b1", buf.getvalue()) + + @patch("roboflow.adapters.rfapi.get_batch") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_json_output(self, _resolve, mock_api): + mock_api.return_value = {"batch": {"name": "b1", "id": "1"}} + parser = _build_annotation_parser() + args = parser.parse_args(["--json", "annotation", "batch", "get", "1", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + data = json.loads(buf.getvalue()) + self.assertIn("batch", data) + + +class TestJobList(unittest.TestCase): + """annotation job list""" + + @patch("roboflow.adapters.rfapi.list_annotation_jobs") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_text_output(self, _resolve, mock_api): + mock_api.return_value = {"jobs": [{"name": "j1", "id": "10", "status": "active", "assigned_to": "a@b.com"}]} + parser = _build_annotation_parser() + args = parser.parse_args(["annotation", "job", "list", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + self.assertIn("j1", buf.getvalue()) + + @patch("roboflow.adapters.rfapi.list_annotation_jobs") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_json_output(self, _resolve, mock_api): + mock_api.return_value = {"jobs": [{"name": "j1", "id": "10"}]} + parser = _build_annotation_parser() + args = parser.parse_args(["--json", "annotation", "job", "list", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + data = json.loads(buf.getvalue()) + self.assertIsInstance(data, list) + + +class TestJobGet(unittest.TestCase): + """annotation job get""" + + @patch("roboflow.adapters.rfapi.get_annotation_job") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_text_output(self, _resolve, mock_api): + mock_api.return_value = {"job": {"name": "j1", "id": "10", "status": "active"}} + parser = _build_annotation_parser() + args = parser.parse_args(["annotation", "job", "get", "10", "-p", "ws/proj"]) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + self.assertIn("j1", buf.getvalue()) + + +class TestJobCreate(unittest.TestCase): + """annotation job create""" + + @patch("roboflow.Roboflow") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_text_output(self, _resolve, mock_rf_cls): + mock_project = MagicMock() + mock_project.create_annotation_job.return_value = {"id": "42", "name": "new-job"} + mock_rf_cls.return_value.workspace.return_value.project.return_value = mock_project + + parser = _build_annotation_parser() + args = parser.parse_args( + [ + "annotation", + "job", + "create", + "-p", + "ws/proj", + "--name", + "new-job", + "--batch", + "b1", + "--num-images", + "5", + "--labeler", + "a@b.com", + "--reviewer", + "c@d.com", + ] + ) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + self.assertIn("new-job", buf.getvalue()) + mock_project.create_annotation_job.assert_called_once_with( + name="new-job", + batch_id="b1", + num_images=5, + labeler_email="a@b.com", + reviewer_email="c@d.com", + ) + + @patch("roboflow.Roboflow") + @patch(_RESOLVE, return_value=("key", "ws", "proj")) + def test_json_output(self, _resolve, mock_rf_cls): + mock_project = MagicMock() + mock_project.create_annotation_job.return_value = {"id": "42", "name": "new-job"} + mock_rf_cls.return_value.workspace.return_value.project.return_value = mock_project + + parser = _build_annotation_parser() + args = parser.parse_args( + [ + "--json", + "annotation", + "job", + "create", + "-p", + "ws/proj", + "--name", + "new-job", + "--batch", + "b1", + "--num-images", + "5", + "--labeler", + "a@b.com", + "--reviewer", + "c@d.com", + ] + ) + + buf = io.StringIO() + with patch("sys.stdout", buf): + args.func(args) + data = json.loads(buf.getvalue()) + self.assertEqual(data["id"], "42") + + def test_create_requires_all_flags(self): + parser = _build_annotation_parser() + # Missing --reviewer should fail + with self.assertRaises(SystemExit): + parser.parse_args( + [ + "annotation", + "job", + "create", + "-p", + "proj", + "--name", + "j", + "--batch", + "b", + "--num-images", + "1", + "--labeler", + "a@b.com", + ] + ) + + if __name__ == "__main__": unittest.main() diff --git a/tests/cli/test_folder_handler.py b/tests/cli/test_folder_handler.py index c75ba939..47f443e4 100644 --- a/tests/cli/test_folder_handler.py +++ b/tests/cli/test_folder_handler.py @@ -1,6 +1,9 @@ """Tests for the folder CLI handler.""" +import json import unittest +from argparse import Namespace +from unittest.mock import patch class TestFolderRegistration(unittest.TestCase): @@ -34,6 +37,14 @@ def test_folder_create_exists(self) -> None: self.assertIsNotNone(args.func) self.assertEqual(args.name, "My Folder") + def test_folder_create_with_flags(self) -> None: + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["folder", "create", "My Folder", "--parent", "p1", "--projects", "a,b"]) + self.assertEqual(args.parent, "p1") + self.assertEqual(args.projects, "a,b") + def test_folder_update_exists(self) -> None: from roboflow.cli import build_parser @@ -49,5 +60,153 @@ def test_folder_delete_exists(self) -> None: self.assertIsNotNone(args.func) +class TestFolderListHandler(unittest.TestCase): + """Test folder list command behavior.""" + + @patch("roboflow.adapters.rfapi.list_folders") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_list_folders_text(self, _mock_key, _mock_ws, mock_list): + mock_list.return_value = {"data": [{"name": "Folder1", "id": "f1", "projects": ["p1", "p2"]}]} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.folder import _list_folders + + with patch("builtins.print") as mock_print: + _list_folders(args) + mock_print.assert_called_once() + printed = mock_print.call_args[0][0] + self.assertIn("Folder1", printed) + + @patch("roboflow.adapters.rfapi.list_folders") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_list_folders_json(self, _mock_key, _mock_ws, mock_list): + mock_list.return_value = {"data": [{"name": "Folder1", "id": "f1", "projects": []}]} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.folder import _list_folders + + with patch("builtins.print") as mock_print: + _list_folders(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertIsInstance(data, list) + self.assertEqual(data[0]["name"], "Folder1") + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=None) + def test_list_folders_no_workspace(self, _mock_ws): + args = Namespace(json=True, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.folder import _list_folders + + with self.assertRaises(SystemExit) as ctx: + _list_folders(args) + self.assertEqual(ctx.exception.code, 2) + + +class TestFolderGetHandler(unittest.TestCase): + """Test folder get command behavior.""" + + @patch("roboflow.adapters.rfapi.get_folder") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_get_folder_text(self, _mock_key, _mock_ws, mock_get): + mock_get.return_value = {"data": [{"name": "MyFolder", "id": "f1", "projects": []}]} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, folder_id="f1") + + from roboflow.cli.handlers.folder import _get_folder + + with patch("builtins.print") as mock_print: + _get_folder(args) + printed = mock_print.call_args[0][0] + self.assertIn("MyFolder", printed) + + +class TestFolderCreateHandler(unittest.TestCase): + """Test folder create command behavior.""" + + @patch("roboflow.adapters.rfapi.create_folder") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_folder_json(self, _mock_key, _mock_ws, mock_create): + mock_create.return_value = {"id": "new-folder-id"} + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, name="NewFolder", parent=None, projects=None + ) + + from roboflow.cli.handlers.folder import _create_folder + + with patch("builtins.print") as mock_print: + _create_folder(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["status"], "created") + self.assertEqual(data["id"], "new-folder-id") + + @patch("roboflow.adapters.rfapi.create_folder") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_create_folder_with_projects(self, _mock_key, _mock_ws, mock_create): + mock_create.return_value = {"id": "f2"} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False, name="F", parent="p1", projects="a,b,c") + + from roboflow.cli.handlers.folder import _create_folder + + with patch("builtins.print"): + _create_folder(args) + mock_create.assert_called_once_with("fake-key", "test-ws", "F", parent_id="p1", project_ids=["a", "b", "c"]) + + +class TestFolderUpdateHandler(unittest.TestCase): + """Test folder update command behavior.""" + + @patch("roboflow.adapters.rfapi.update_folder") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_update_folder_json(self, _mock_key, _mock_ws, mock_update): + mock_update.return_value = {} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, folder_id="f1", name="Renamed") + + from roboflow.cli.handlers.folder import _update_folder + + with patch("builtins.print") as mock_print: + _update_folder(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["status"], "updated") + + +class TestFolderDeleteHandler(unittest.TestCase): + """Test folder delete command behavior.""" + + @patch("roboflow.adapters.rfapi.delete_folder") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_delete_folder_json(self, _mock_key, _mock_ws, mock_delete): + mock_delete.return_value = {} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, folder_id="f1") + + from roboflow.cli.handlers.folder import _delete_folder + + with patch("builtins.print") as mock_print: + _delete_folder(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertEqual(data["status"], "deleted") + + @patch("roboflow.adapters.rfapi.delete_folder", side_effect=Exception("Not found")) + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_delete_folder_error_json(self, _mock_key, _mock_ws, _mock_delete): + args = Namespace(json=True, workspace=None, api_key=None, quiet=False, folder_id="bad-id") + + from roboflow.cli.handlers.folder import _delete_folder + + with self.assertRaises(SystemExit) as ctx: + _delete_folder(args) + self.assertEqual(ctx.exception.code, 3) + + if __name__ == "__main__": unittest.main() diff --git a/tests/cli/test_universe_handler.py b/tests/cli/test_universe_handler.py index 16962b73..b01170a5 100644 --- a/tests/cli/test_universe_handler.py +++ b/tests/cli/test_universe_handler.py @@ -27,6 +27,143 @@ def test_universe_search_with_flags(self) -> None: self.assertEqual(args.type, "model") self.assertEqual(args.limit, 5) + def test_universe_search_default_limit(self) -> None: + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["universe", "search", "cats"]) + self.assertEqual(args.limit, 12) + + +class TestUniverseSearch(unittest.TestCase): + """Test universe search handler.""" + + def test_search_success(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["universe", "search", "cats"]) + from unittest.mock import patch + + mock_data = { + "results": [ + {"name": "cats-dataset", "type": "dataset", "images": 1000, "url": "https://example.com/cats"}, + ] + } + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.adapters.rfapi.search_universe", return_value=mock_data): + with patch("roboflow.config.load_roboflow_api_key", return_value="test-key"): + args.func(args) + finally: + sys.stdout = old_stdout + out = captured.getvalue() + self.assertIn("cats-dataset", out) + + def test_search_passes_api_key(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["universe", "search", "cats"]) + from unittest.mock import patch + + mock_data = {"results": []} + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.adapters.rfapi.search_universe", return_value=mock_data) as mock_search: + with patch("roboflow.config.load_roboflow_api_key", return_value="my-key"): + args.func(args) + finally: + sys.stdout = old_stdout + mock_search.assert_called_once_with("cats", api_key="my-key", project_type=None, limit=12) + + def test_search_passes_custom_limit(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["universe", "search", "dogs", "--limit", "5"]) + from unittest.mock import patch + + mock_data = {"results": []} + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.adapters.rfapi.search_universe", return_value=mock_data) as mock_search: + with patch("roboflow.config.load_roboflow_api_key", return_value="k"): + args.func(args) + finally: + sys.stdout = old_stdout + mock_search.assert_called_once_with("dogs", api_key="k", project_type=None, limit=5) + + def test_search_json_output(self) -> None: + import io + import json + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["--json", "universe", "search", "dogs"]) + from unittest.mock import patch + + mock_data = { + "results": [ + {"name": "dogs-dataset", "type": "dataset", "images": 500, "url": "https://example.com/dogs"}, + ] + } + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.adapters.rfapi.search_universe", return_value=mock_data): + args.func(args) + finally: + sys.stdout = old_stdout + result = json.loads(captured.getvalue()) + self.assertIsInstance(result, list) + self.assertEqual(result[0]["name"], "dogs-dataset") + + def test_search_api_error_json(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["--json", "universe", "search", "fail"]) + from unittest.mock import patch + + from roboflow.adapters.rfapi import RoboflowError + + captured = io.StringIO() + old_stderr = sys.stderr + sys.stderr = captured + try: + with patch("roboflow.adapters.rfapi.search_universe", side_effect=RoboflowError("API down")): + with self.assertRaises(SystemExit) as ctx: + args.func(args) + self.assertEqual(ctx.exception.code, 1) + finally: + sys.stderr = old_stderr + import json + + err = json.loads(captured.getvalue()) + self.assertIn("error", err) + if __name__ == "__main__": unittest.main() diff --git a/tests/cli/test_version_handler.py b/tests/cli/test_version_handler.py index ceefb869..573bf52a 100644 --- a/tests/cli/test_version_handler.py +++ b/tests/cli/test_version_handler.py @@ -70,10 +70,17 @@ def test_version_export_parses_args(self) -> None: self.assertEqual(args.project, "my-project") self.assertEqual(args.format, "yolov8") - def test_version_create_is_stub(self) -> None: + def test_version_create_parses_args(self) -> None: parser = _make_parser() - args = parser.parse_args(["version", "create", "-p", "my-project"]) + args = parser.parse_args(["version", "create", "-p", "my-project", "--settings", "config.json"]) self.assertIsNotNone(args.func) + self.assertEqual(args.project, "my-project") + self.assertEqual(args.settings, "config.json") + + def test_version_create_requires_settings(self) -> None: + parser = _make_parser() + with self.assertRaises(SystemExit): + parser.parse_args(["version", "create", "-p", "my-project"]) def test_subcommands_have_func(self) -> None: parser = _make_parser() @@ -82,13 +89,85 @@ def test_subcommands_have_func(self) -> None: "get 3 -p proj", "download ws/proj/1", "export 1 -p proj", - "create -p proj", + "create -p proj --settings s.json", ] for subcmd in subcmds: args = parser.parse_args(["version"] + subcmd.split()) self.assertIsNotNone(args.func, f"version {subcmd} has no func") +class TestVersionCreate(unittest.TestCase): + """Test version create handler.""" + + def test_create_missing_settings_file(self) -> None: + from unittest.mock import patch + + parser = _make_parser() + args = parser.parse_args( + ["--json", "version", "create", "-p", "my-ws/my-project", "--settings", "/nonexistent/file.json"] + ) + args.api_key = "fake-key" + with patch("roboflow.config.load_roboflow_api_key", return_value="fake-key"): + with self.assertRaises(SystemExit) as ctx: + args.func(args) + self.assertEqual(ctx.exception.code, 1) + + def test_create_invalid_json_file(self) -> None: + import tempfile + from unittest.mock import patch + + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + f.write("not valid json") + f.flush() + parser = _make_parser() + args = parser.parse_args(["--json", "version", "create", "-p", "my-ws/my-project", "--settings", f.name]) + args.api_key = "fake-key" + with patch("roboflow.config.load_roboflow_api_key", return_value="fake-key"): + with self.assertRaises(SystemExit) as ctx: + args.func(args) + self.assertEqual(ctx.exception.code, 1) + + def test_create_no_api_key(self) -> None: + import json + import tempfile + + settings = {"augmentation": {}, "preprocessing": {}} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(settings, f) + f.flush() + parser = _make_parser() + args = parser.parse_args(["--json", "version", "create", "-p", "my-ws/my-project", "--settings", f.name]) + # Patch load_roboflow_api_key to return None + from unittest.mock import patch + + with patch("roboflow.config.load_roboflow_api_key", return_value=None): + with self.assertRaises(SystemExit) as ctx: + args.func(args) + self.assertEqual(ctx.exception.code, 2) + + def test_create_json_error_output(self) -> None: + import io + import sys + + parser = _make_parser() + args = parser.parse_args( + ["--json", "version", "create", "-p", "my-ws/my-project", "--settings", "/nonexistent/file.json"] + ) + captured = io.StringIO() + old_stderr = sys.stderr + sys.stderr = captured + try: + with self.assertRaises(SystemExit): + args.func(args) + finally: + sys.stderr = old_stderr + import json + + err = json.loads(captured.getvalue()) + self.assertIn("error", err) + self.assertIn("message", err["error"]) + + class TestParseUrl(unittest.TestCase): """Test the _parse_url helper.""" diff --git a/tests/cli/test_video_handler.py b/tests/cli/test_video_handler.py index 4d80d696..224ff111 100644 --- a/tests/cli/test_video_handler.py +++ b/tests/cli/test_video_handler.py @@ -38,5 +38,104 @@ def test_video_status_exists(self) -> None: self.assertEqual(args.job_id, "job-123") +class TestVideoStatus(unittest.TestCase): + """Test video status handler.""" + + def test_status_no_api_key(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["--json", "video", "status", "job-123"]) + from unittest.mock import patch + + captured = io.StringIO() + old_stderr = sys.stderr + sys.stderr = captured + try: + with patch("roboflow.config.load_roboflow_api_key", return_value=None): + with self.assertRaises(SystemExit) as ctx: + args.func(args) + self.assertEqual(ctx.exception.code, 2) + finally: + sys.stderr = old_stderr + import json + + err = json.loads(captured.getvalue()) + self.assertIn("error", err) + + def test_status_success(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["video", "status", "job-abc"]) + from unittest.mock import patch + + mock_data = {"status": "completed", "progress": "100%"} + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.config.load_roboflow_api_key", return_value="fake-key"): + with patch("roboflow.adapters.rfapi.get_video_job_status", return_value=mock_data): + args.func(args) + finally: + sys.stdout = old_stdout + out = captured.getvalue() + self.assertIn("job-abc", out) + self.assertIn("completed", out) + + def test_status_json_output(self) -> None: + import io + import json + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["--json", "video", "status", "job-abc"]) + from unittest.mock import patch + + mock_data = {"status": "processing", "progress": "50%"} + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.config.load_roboflow_api_key", return_value="fake-key"): + with patch("roboflow.adapters.rfapi.get_video_job_status", return_value=mock_data): + args.func(args) + finally: + sys.stdout = old_stdout + result = json.loads(captured.getvalue()) + self.assertEqual(result["status"], "processing") + + def test_status_passes_job_id_to_api(self) -> None: + import io + import sys + + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["video", "status", "my-unique-job-777"]) + from unittest.mock import patch + + mock_data = {"status": "completed"} + captured = io.StringIO() + old_stdout = sys.stdout + sys.stdout = captured + try: + with patch("roboflow.config.load_roboflow_api_key", return_value="fake-key"): + with patch("roboflow.adapters.rfapi.get_video_job_status", return_value=mock_data) as mock_api: + args.func(args) + finally: + sys.stdout = old_stdout + mock_api.assert_called_once_with("fake-key", "my-unique-job-777") + + if __name__ == "__main__": unittest.main() diff --git a/tests/cli/test_workflow_handler.py b/tests/cli/test_workflow_handler.py index 070776d2..355d4d21 100644 --- a/tests/cli/test_workflow_handler.py +++ b/tests/cli/test_workflow_handler.py @@ -1,6 +1,18 @@ """Tests for the workflow CLI handler.""" +import json +import os +import tempfile import unittest +from argparse import Namespace +from unittest.mock import patch + + +def _make_args(**kwargs): + """Create a Namespace with CLI defaults.""" + defaults = {"json": False, "workspace": "test-ws", "api_key": "test-key", "quiet": False} + defaults.update(kwargs) + return Namespace(**defaults) class TestWorkflowRegistration(unittest.TestCase): @@ -79,5 +91,325 @@ def test_workflow_deploy_exists(self) -> None: self.assertIsNotNone(args.func) +class TestWorkflowList(unittest.TestCase): + @patch("roboflow.adapters.rfapi.list_workflows") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_list_workflows_text(self, _mock_key, mock_list): + from roboflow.cli.handlers.workflow import _list_workflows + + mock_list.return_value = { + "workflows": [ + {"name": "My Workflow", "url": "my-workflow", "status": "active"}, + ] + } + args = _make_args() + with patch("builtins.print") as mock_print: + _list_workflows(args) + mock_list.assert_called_once_with("test-key", "test-ws") + printed = mock_print.call_args[0][0] + self.assertIn("My Workflow", printed) + + @patch("roboflow.adapters.rfapi.list_workflows") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_list_workflows_json(self, _mock_key, mock_list): + from roboflow.cli.handlers.workflow import _list_workflows + + mock_list.return_value = { + "workflows": [ + {"name": "WF1", "url": "wf-1", "status": "active"}, + ] + } + args = _make_args(json=True) + with patch("builtins.print") as mock_print: + _list_workflows(args) + out = json.loads(mock_print.call_args[0][0]) + self.assertIsInstance(out, list) + self.assertEqual(out[0]["name"], "WF1") + + @patch("roboflow.adapters.rfapi.list_workflows") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_list_workflows_error(self, _mock_key, mock_list): + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli.handlers.workflow import _list_workflows + + mock_list.side_effect = RoboflowError("Not found") + args = _make_args() + with self.assertRaises(SystemExit) as ctx: + _list_workflows(args) + self.assertEqual(ctx.exception.code, 3) + + +class TestWorkflowGet(unittest.TestCase): + @patch("roboflow.adapters.rfapi.get_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_get_workflow_text(self, _mock_key, mock_get): + from roboflow.cli.handlers.workflow import _get_workflow + + mock_get.return_value = { + "workflow": { + "name": "My WF", + "url": "my-wf", + "description": "A test workflow", + "blockCount": 5, + } + } + args = _make_args(workflow_url="my-wf") + with patch("builtins.print") as mock_print: + _get_workflow(args) + mock_get.assert_called_once_with("test-key", "test-ws", "my-wf") + printed = mock_print.call_args[0][0] + self.assertIn("My WF", printed) + self.assertIn("5", printed) + + @patch("roboflow.adapters.rfapi.get_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_get_workflow_json(self, _mock_key, mock_get): + from roboflow.cli.handlers.workflow import _get_workflow + + mock_get.return_value = {"workflow": {"name": "My WF", "url": "my-wf"}} + args = _make_args(json=True, workflow_url="my-wf") + with patch("builtins.print") as mock_print: + _get_workflow(args) + out = json.loads(mock_print.call_args[0][0]) + self.assertIn("workflow", out) + + +class TestWorkflowCreate(unittest.TestCase): + @patch("roboflow.adapters.rfapi.create_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_create_workflow_basic(self, _mock_key, mock_create): + from roboflow.cli.handlers.workflow import _create_workflow + + mock_create.return_value = {"name": "New WF", "url": "new-wf"} + args = _make_args(name="New WF", definition=None, description=None) + with patch("builtins.print") as mock_print: + _create_workflow(args) + mock_create.assert_called_once_with("test-key", "test-ws", name="New WF", config="{}", template="{}") + printed = mock_print.call_args[0][0] + self.assertIn("Created workflow", printed) + + @patch("roboflow.adapters.rfapi.create_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_create_workflow_with_definition(self, _mock_key, mock_create): + from roboflow.cli.handlers.workflow import _create_workflow + + mock_create.return_value = {"name": "New WF", "url": "new-wf"} + defn = {"blocks": [{"type": "input"}]} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(defn, f) + f.flush() + tmp_path = f.name + + try: + args = _make_args(name="New WF", definition=tmp_path, description="A desc") + with patch("builtins.print"): + _create_workflow(args) + mock_create.assert_called_once_with( + "test-key", "test-ws", name="New WF", config=json.dumps(defn), template="{}" + ) + finally: + os.unlink(tmp_path) + + def test_create_workflow_missing_file(self): + from roboflow.cli.handlers.workflow import _create_workflow + + args = _make_args(name="New WF", definition="/nonexistent/file.json", description=None) + with self.assertRaises(SystemExit) as ctx: + _create_workflow(args) + self.assertEqual(ctx.exception.code, 1) + + def test_create_workflow_invalid_json(self): + from roboflow.cli.handlers.workflow import _create_workflow + + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + f.write("{bad json") + f.flush() + tmp_path = f.name + + try: + args = _make_args(name="New WF", definition=tmp_path, description=None) + with self.assertRaises(SystemExit) as ctx: + _create_workflow(args) + self.assertEqual(ctx.exception.code, 1) + finally: + os.unlink(tmp_path) + + +class TestWorkflowUpdate(unittest.TestCase): + @patch("roboflow.adapters.rfapi.update_workflow") + @patch("roboflow.adapters.rfapi.get_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_update_workflow(self, _mock_key, mock_get, mock_update): + from roboflow.cli.handlers.workflow import _update_workflow + + mock_get.return_value = {"workflow": {"id": "wf-123", "name": "My WF", "url": "my-wf", "config": "{}"}} + mock_update.return_value = {"url": "my-wf", "status": "updated"} + defn = {"blocks": []} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(defn, f) + f.flush() + tmp_path = f.name + + try: + args = _make_args(workflow_url="my-wf", definition=tmp_path) + with patch("builtins.print") as mock_print: + _update_workflow(args) + mock_get.assert_called_once_with("test-key", "test-ws", "my-wf") + mock_update.assert_called_once_with( + "test-key", + "test-ws", + workflow_id="wf-123", + workflow_name="My WF", + workflow_url="my-wf", + config=json.dumps(defn), + ) + printed = mock_print.call_args[0][0] + self.assertIn("Updated workflow", printed) + finally: + os.unlink(tmp_path) + + @patch("roboflow.adapters.rfapi.update_workflow") + @patch("roboflow.adapters.rfapi.get_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_update_workflow_no_definition(self, _mock_key, mock_get, mock_update): + """When no --definition is given, existing config is preserved.""" + from roboflow.cli.handlers.workflow import _update_workflow + + mock_get.return_value = { + "workflow": {"id": "wf-123", "name": "My WF", "url": "my-wf", "config": '{"existing": true}'} + } + mock_update.return_value = {"url": "my-wf", "status": "updated"} + args = _make_args(workflow_url="my-wf", definition=None) + with patch("builtins.print"): + _update_workflow(args) + mock_update.assert_called_once_with( + "test-key", + "test-ws", + workflow_id="wf-123", + workflow_name="My WF", + workflow_url="my-wf", + config='{"existing": true}', + ) + + def test_update_workflow_missing_file(self): + from roboflow.cli.handlers.workflow import _update_workflow + + args = _make_args(workflow_url="my-wf", definition="/nonexistent/file.json") + with self.assertRaises(SystemExit) as ctx: + _update_workflow(args) + self.assertEqual(ctx.exception.code, 1) + + +class TestWorkflowVersionList(unittest.TestCase): + @patch("roboflow.adapters.rfapi.list_workflow_versions") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_list_versions(self, _mock_key, mock_versions): + from roboflow.cli.handlers.workflow import _list_workflow_versions + + mock_versions.return_value = { + "versions": [ + {"version": "1", "created": "2026-01-01"}, + {"version": "2", "created": "2026-02-01"}, + ] + } + args = _make_args(workflow_url="my-wf") + with patch("builtins.print") as mock_print: + _list_workflow_versions(args) + mock_versions.assert_called_once_with("test-key", "test-ws", "my-wf") + printed = mock_print.call_args[0][0] + self.assertIn("1", printed) + self.assertIn("2", printed) + + @patch("roboflow.adapters.rfapi.list_workflow_versions") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_list_versions_json(self, _mock_key, mock_versions): + from roboflow.cli.handlers.workflow import _list_workflow_versions + + mock_versions.return_value = {"versions": [{"version": "1", "created": "2026-01-01"}]} + args = _make_args(json=True, workflow_url="my-wf") + with patch("builtins.print") as mock_print: + _list_workflow_versions(args) + out = json.loads(mock_print.call_args[0][0]) + self.assertIsInstance(out, list) + + +class TestWorkflowFork(unittest.TestCase): + @patch("roboflow.adapters.rfapi.fork_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_fork_workflow_same_workspace(self, _mock_key, mock_fork): + """When workflow_url is just a slug, source_workspace defaults to current ws.""" + from roboflow.cli.handlers.workflow import _fork_workflow + + mock_fork.return_value = {"url": "my-wf-fork", "workflow_url": "my-wf-fork"} + args = _make_args(workflow_url="my-wf") + with patch("builtins.print") as mock_print: + _fork_workflow(args) + mock_fork.assert_called_once_with("test-key", "test-ws", source_workspace="test-ws", source_workflow="my-wf") + printed = mock_print.call_args[0][0] + self.assertIn("Forked workflow", printed) + + @patch("roboflow.adapters.rfapi.fork_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_fork_workflow_cross_workspace(self, _mock_key, mock_fork): + """When workflow_url is 'other-ws/my-wf', source_workspace is parsed.""" + from roboflow.cli.handlers.workflow import _fork_workflow + + mock_fork.return_value = {"url": "my-wf-fork"} + args = _make_args(workflow_url="other-ws/my-wf") + with patch("builtins.print"): + _fork_workflow(args) + mock_fork.assert_called_once_with("test-key", "test-ws", source_workspace="other-ws", source_workflow="my-wf") + + @patch("roboflow.adapters.rfapi.fork_workflow") + @patch("roboflow.config.load_roboflow_api_key", return_value="test-key") + def test_fork_workflow_json(self, _mock_key, mock_fork): + from roboflow.cli.handlers.workflow import _fork_workflow + + mock_fork.return_value = {"url": "my-wf-fork"} + args = _make_args(json=True, workflow_url="my-wf") + with patch("builtins.print") as mock_print: + _fork_workflow(args) + out = json.loads(mock_print.call_args[0][0]) + self.assertEqual(out["status"], "forked") + self.assertEqual(out["source"], "my-wf") + self.assertEqual(out["new_url"], "my-wf-fork") + + +class TestWorkflowStubs(unittest.TestCase): + def test_build_stub(self): + from roboflow.cli.handlers.workflow import _stub_build + + args = _make_args() + with self.assertRaises(SystemExit): + _stub_build(args) + + def test_run_stub(self): + from roboflow.cli.handlers.workflow import _stub_run + + args = _make_args() + with self.assertRaises(SystemExit): + _stub_run(args) + + def test_deploy_stub(self): + from roboflow.cli.handlers.workflow import _stub_deploy + + args = _make_args() + with self.assertRaises(SystemExit): + _stub_deploy(args) + + +class TestWorkflowNoWorkspace(unittest.TestCase): + """Verify proper error when no workspace is available.""" + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=None) + def test_list_no_workspace(self, _mock_resolve): + from roboflow.cli.handlers.workflow import _list_workflows + + args = _make_args(workspace=None, api_key=None) + with self.assertRaises(SystemExit) as ctx: + _list_workflows(args) + self.assertEqual(ctx.exception.code, 2) + + if __name__ == "__main__": unittest.main() diff --git a/tests/cli/test_workspace.py b/tests/cli/test_workspace.py index 0b7a2591..a0623354 100644 --- a/tests/cli/test_workspace.py +++ b/tests/cli/test_workspace.py @@ -1,6 +1,9 @@ """Tests for the workspace CLI handler.""" +import json import unittest +from argparse import Namespace +from unittest.mock import patch class TestWorkspaceRegistration(unittest.TestCase): @@ -26,11 +29,179 @@ def test_workspace_get_positional(self) -> None: self.assertEqual(args.workspace_id, "my-ws") self.assertIsNotNone(args.func) + def test_workspace_usage_exists(self) -> None: + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["workspace", "usage"]) + self.assertIsNotNone(args.func) + + def test_workspace_plan_exists(self) -> None: + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["workspace", "plan"]) + self.assertIsNotNone(args.func) + + def test_workspace_stats_exists(self) -> None: + from roboflow.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["workspace", "stats", "--start-date", "2026-01-01", "--end-date", "2026-04-01"]) + self.assertIsNotNone(args.func) + self.assertEqual(args.start_date, "2026-01-01") + self.assertEqual(args.end_date, "2026-04-01") + def test_handler_functions_exist(self) -> None: from roboflow.cli.handlers import workspace self.assertTrue(callable(workspace._list_workspaces)) self.assertTrue(callable(workspace._get_workspace)) + self.assertTrue(callable(workspace._workspace_usage)) + self.assertTrue(callable(workspace._workspace_plan)) + self.assertTrue(callable(workspace._workspace_stats)) + + +class TestWorkspaceUsageHandler(unittest.TestCase): + """Test workspace usage command behavior.""" + + @patch("roboflow.adapters.rfapi.get_billing_usage") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_usage_json(self, _mock_key, _mock_ws, mock_usage): + mock_usage.return_value = {"usage": {"inference_calls": 100, "images_uploaded": 50}} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.workspace import _workspace_usage + + with patch("builtins.print") as mock_print: + _workspace_usage(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertIn("usage", data) + + @patch("roboflow.adapters.rfapi.get_billing_usage") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_usage_text(self, _mock_key, _mock_ws, mock_usage): + mock_usage.return_value = {"usage": {"inference_calls": 100}} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.workspace import _workspace_usage + + with patch("builtins.print") as mock_print: + _workspace_usage(args) + printed = mock_print.call_args[0][0] + self.assertIn("Billing Usage", printed) + + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value=None) + def test_usage_no_workspace(self, _mock_ws): + args = Namespace(json=True, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.workspace import _workspace_usage + + with self.assertRaises(SystemExit) as ctx: + _workspace_usage(args) + self.assertEqual(ctx.exception.code, 2) + + +class TestWorkspacePlanHandler(unittest.TestCase): + """Test workspace plan command behavior.""" + + @patch("roboflow.adapters.rfapi.get_plan_info") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_plan_json(self, _mock_key, _mock_ws, mock_plan): + mock_plan.return_value = {"plan": {"name": "Pro", "limit": 10000}} + args = Namespace(json=True, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.workspace import _workspace_plan + + with patch("builtins.print") as mock_print: + _workspace_plan(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertIn("plan", data) + + @patch("roboflow.adapters.rfapi.get_plan_info") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_plan_text(self, _mock_key, _mock_ws, mock_plan): + mock_plan.return_value = {"plan": {"name": "Pro"}} + args = Namespace(json=False, workspace=None, api_key=None, quiet=False) + + from roboflow.cli.handlers.workspace import _workspace_plan + + with patch("builtins.print") as mock_print: + _workspace_plan(args) + printed = mock_print.call_args[0][0] + self.assertIn("Plan Info", printed) + + +class TestWorkspaceStatsHandler(unittest.TestCase): + """Test workspace stats command behavior.""" + + @patch("roboflow.adapters.rfapi.get_labeling_stats") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_stats_json(self, _mock_key, _mock_ws, mock_stats): + mock_stats.return_value = {"stats": {"total_annotations": 500}} + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, start_date="2026-01-01", end_date="2026-04-01" + ) + + from roboflow.cli.handlers.workspace import _workspace_stats + + with patch("builtins.print") as mock_print: + _workspace_stats(args) + printed = mock_print.call_args[0][0] + data = json.loads(printed) + self.assertIn("stats", data) + + @patch("roboflow.adapters.rfapi.get_labeling_stats") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_stats_passes_dates(self, _mock_key, _mock_ws, mock_stats): + mock_stats.return_value = {"stats": {"total_annotations": 500}} + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, start_date="2026-01-01", end_date="2026-04-01" + ) + + from roboflow.cli.handlers.workspace import _workspace_stats + + with patch("builtins.print"): + _workspace_stats(args) + mock_stats.assert_called_once_with("fake-key", "test-ws", start_date="2026-01-01", end_date="2026-04-01") + + @patch("roboflow.adapters.rfapi.get_labeling_stats") + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_stats_text(self, _mock_key, _mock_ws, mock_stats): + mock_stats.return_value = {"stats": {"total_annotations": 500}} + args = Namespace( + json=False, workspace=None, api_key=None, quiet=False, start_date="2026-01-01", end_date="2026-04-01" + ) + + from roboflow.cli.handlers.workspace import _workspace_stats + + with patch("builtins.print") as mock_print: + _workspace_stats(args) + printed = mock_print.call_args[0][0] + self.assertIn("Labeling Stats", printed) + + @patch("roboflow.adapters.rfapi.get_labeling_stats", side_effect=Exception("server error")) + @patch("roboflow.cli._resolver.resolve_default_workspace", return_value="test-ws") + @patch("roboflow.config.load_roboflow_api_key", return_value="fake-key") + def test_stats_error_json(self, _mock_key, _mock_ws, _mock_stats): + args = Namespace( + json=True, workspace=None, api_key=None, quiet=False, start_date="2026-01-01", end_date="2026-04-01" + ) + + from roboflow.cli.handlers.workspace import _workspace_stats + + with self.assertRaises(SystemExit) as ctx: + _workspace_stats(args) + self.assertEqual(ctx.exception.code, 3) if __name__ == "__main__":