diff --git a/README.md b/README.md index 3e543856e..9380e2d29 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,9 @@ agent-browser pdf # Save as PDF agent-browser snapshot # Accessibility tree with refs (best for AI) agent-browser eval # Run JavaScript (-b for base64, --stdin for piped input) agent-browser connect # Connect to browser via CDP +agent-browser stream enable [--port ] # Start runtime WebSocket streaming +agent-browser stream status # Show runtime streaming state and bound port +agent-browser stream disable # Stop runtime WebSocket streaming agent-browser close # Close browser (aliases: quit, exit) ``` @@ -925,13 +928,26 @@ Stream the browser viewport via WebSocket for live preview or "pair browsing" wh ### Enable Streaming -Set the `AGENT_BROWSER_STREAM_PORT` environment variable: +For an already-running session, enable streaming at runtime: + +```bash +agent-browser stream enable +agent-browser stream status +agent-browser stream disable +``` + +`stream enable` binds an available localhost port automatically unless you pass `--port `. +Use `stream status` to inspect whether streaming is enabled, which port is active, whether a browser is attached, and whether screencasting is active. + +If you want streaming to be available immediately when the daemon starts, set `AGENT_BROWSER_STREAM_PORT` before the first command in that session: ```bash AGENT_BROWSER_STREAM_PORT=9223 agent-browser open example.com ``` -This starts a WebSocket server on the specified port that streams the browser viewport and accepts input events. +The environment variable only affects daemon startup. For sessions that are already running, use `agent-browser stream enable` instead. + +Once enabled, the WebSocket server streams the browser viewport and accepts input events. ### WebSocket Protocol diff --git a/cli/src/commands.rs b/cli/src/commands.rs index cf95c1fd3..a802c0647 100644 --- a/cli/src/commands.rs +++ b/cli/src/commands.rs @@ -796,6 +796,62 @@ pub fn parse_command(args: &[String], flags: &Flags) -> Result match rest.first().copied() { + Some("enable") => { + let mut cmd = json!({ "id": id, "action": "stream_enable" }); + let mut i = 1; + while i < rest.len() { + match rest[i] { + "--port" => { + let value = + rest.get(i + 1) + .ok_or_else(|| ParseError::MissingArguments { + context: "stream enable --port".to_string(), + usage: "stream enable [--port ]", + })?; + let port = + value.parse::().map_err(|_| ParseError::InvalidValue { + message: format!( + "Invalid port: '{}' is not a valid integer", + value + ), + usage: "stream enable [--port ]", + })?; + if port > u16::MAX as u32 { + return Err(ParseError::InvalidValue { + message: format!( + "Invalid port: {} is out of range (valid range: 0-65535)", + port + ), + usage: "stream enable [--port ]", + }); + } + cmd["port"] = json!(port); + i += 2; + } + flag => { + return Err(ParseError::InvalidValue { + message: format!("Unknown flag for stream enable: {}", flag), + usage: "stream enable [--port ]", + }); + } + } + } + Ok(cmd) + } + Some("disable") => Ok(json!({ "id": id, "action": "stream_disable" })), + Some("status") => Ok(json!({ "id": id, "action": "stream_status" })), + Some(sub) => Err(ParseError::UnknownSubcommand { + subcommand: sub.to_string(), + valid_options: &["enable", "disable", "status"], + }), + None => Err(ParseError::MissingArguments { + context: "stream".to_string(), + usage: "stream ", + }), + }, + // === Get === "get" => parse_get(&rest, &id), @@ -3617,6 +3673,46 @@ mod tests { assert_eq!(cmd["cdpPort"], 1); } + // === Runtime stream control tests === + + #[test] + fn test_stream_enable_auto_port() { + let cmd = parse_command(&args("stream enable"), &default_flags()).unwrap(); + assert_eq!(cmd["action"], "stream_enable"); + assert!(cmd.get("port").is_none()); + } + + #[test] + fn test_stream_enable_with_port() { + let cmd = parse_command(&args("stream enable --port 9223"), &default_flags()).unwrap(); + assert_eq!(cmd["action"], "stream_enable"); + assert_eq!(cmd["port"], 9223); + } + + #[test] + fn test_stream_status() { + let cmd = parse_command(&args("stream status"), &default_flags()).unwrap(); + assert_eq!(cmd["action"], "stream_status"); + } + + #[test] + fn test_stream_disable() { + let cmd = parse_command(&args("stream disable"), &default_flags()).unwrap(); + assert_eq!(cmd["action"], "stream_disable"); + } + + #[test] + fn test_stream_enable_invalid_port() { + let result = parse_command(&args("stream enable --port abc"), &default_flags()); + assert!(matches!(result, Err(ParseError::InvalidValue { .. }))); + } + + #[test] + fn test_stream_missing_subcommand() { + let result = parse_command(&args("stream"), &default_flags()); + assert!(matches!(result, Err(ParseError::MissingArguments { .. }))); + } + // === Trace Tests === #[test] diff --git a/cli/src/connection.rs b/cli/src/connection.rs index bfcf2fd83..867fd44ea 100644 --- a/cli/src/connection.rs +++ b/cli/src/connection.rs @@ -122,6 +122,8 @@ fn get_pid_path(session: &str) -> PathBuf { fn cleanup_stale_files(session: &str) { let pid_path = get_pid_path(session); let _ = fs::remove_file(&pid_path); + let stream_path = get_socket_dir().join(format!("{}.stream", session)); + let _ = fs::remove_file(&stream_path); #[cfg(unix)] { diff --git a/cli/src/native/actions.rs b/cli/src/native/actions.rs index 44543ff7e..5b4c1ded5 100644 --- a/cli/src/native/actions.rs +++ b/cli/src/native/actions.rs @@ -1,6 +1,7 @@ use serde_json::{json, Value}; use std::collections::HashMap; use std::env; +use std::fs; use std::io::Write; use std::path::PathBuf; use std::sync::atomic::AtomicU64; @@ -8,6 +9,8 @@ use std::sync::Arc; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::sync::{broadcast, oneshot, RwLock}; +use crate::connection::get_socket_dir; + use super::auth; use super::browser::{should_track_target, BrowserManager, WaitUntil}; use super::cdp::chrome::LaunchOptions; @@ -991,6 +994,9 @@ pub async fn execute_command(cmd: &Value, state: &mut DaemonState) -> Value { | "state_clean" | "state_rename" | "device_list" + | "stream_enable" + | "stream_disable" + | "stream_status" ); if !skip_launch { // Check if existing connection is stale and needs re-launch. @@ -1008,6 +1014,7 @@ pub async fn execute_command(cmd: &Value, state: &mut DaemonState) -> Value { let _ = mgr.close().await; } state.browser = None; + state.screencasting = false; state.reset_input_state(); state.update_stream_client().await; } @@ -1137,6 +1144,9 @@ pub async fn execute_command(cmd: &Value, state: &mut DaemonState) -> Value { "device" => handle_device(cmd, state).await, "screencast_start" => handle_screencast_start(cmd, state).await, "screencast_stop" => handle_screencast_stop(state).await, + "stream_enable" => handle_stream_enable(cmd, state).await, + "stream_disable" => handle_stream_disable(state).await, + "stream_status" => handle_stream_status(state).await, "waitforurl" => handle_waitforurl(cmd, state).await, "waitforloadstate" => handle_waitforloadstate(cmd, state).await, "waitforfunction" => handle_waitforfunction(cmd, state).await, @@ -1372,6 +1382,7 @@ async fn handle_launch(cmd: &Value, state: &mut DaemonState) -> Result Result { mgr.close().await?; } state.browser = None; + state.screencasting = false; state.reset_input_state(); state.update_stream_client().await; @@ -4272,6 +4284,114 @@ async fn handle_device(cmd: &Value, state: &DaemonState) -> Result PathBuf { + get_socket_dir().join(format!("{}.stream", session_id)) +} + +fn write_stream_file(session_id: &str, port: u16) -> Result<(), String> { + let path = stream_file_path(session_id); + fs::write(&path, port.to_string()).map_err(|e| { + format!( + "Failed to write stream metadata '{}': {}", + path.display(), + e + ) + }) +} + +fn remove_stream_file(session_id: &str) -> Result<(), String> { + let path = stream_file_path(session_id); + match fs::remove_file(&path) { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(format!( + "Failed to remove stream metadata '{}': {}", + path.display(), + err + )), + } +} + +async fn current_stream_status(state: &DaemonState) -> Value { + debug_assert_eq!( + state.stream_server.is_some(), + state.stream_client.is_some(), + "stream server and stream client slot should be set together" + ); + + let connected = match state.browser.as_ref() { + Some(mgr) => mgr.is_connection_alive().await, + None => false, + }; + let runtime_screencasting = match state.stream_server.as_ref() { + Some(server) => server.is_screencasting().await, + None => false, + }; + + json!({ + "enabled": state.stream_server.is_some(), + "port": state + .stream_server + .as_ref() + .map(|server| Value::from(server.port())) + .unwrap_or(Value::Null), + "connected": connected, + "screencasting": connected && (state.screencasting || runtime_screencasting), + }) +} + +async fn handle_stream_enable(cmd: &Value, state: &mut DaemonState) -> Result { + if state.stream_server.is_some() { + return Err("Streaming is already enabled for this session".to_string()); + } + + let requested_port = match cmd.get("port").and_then(|value| value.as_u64()) { + Some(raw) => u16::try_from(raw) + .map_err(|_| format!("Invalid stream port '{}': expected 0-65535", raw))?, + None => 0, + }; + + let (server, client_slot) = + StreamServer::start_without_client(requested_port, state.session_id.clone()).await?; + let port = server.port(); + if let Err(err) = write_stream_file(&state.session_id, port) { + server.shutdown().await; + return Err(err); + } + + state.stream_client = Some(client_slot); + state.stream_server = Some(Arc::new(server)); + if state.screencasting { + if let Some(ref server) = state.stream_server { + server.set_screencasting(true).await; + } + } + state.update_stream_client().await; + + Ok(current_stream_status(state).await) +} + +async fn handle_stream_disable(state: &mut DaemonState) -> Result { + let Some(server) = state.stream_server.clone() else { + return Err("Streaming is not enabled for this session".to_string()); + }; + + server.shutdown().await; + state.stream_server = None; + state.stream_client = None; + remove_stream_file(&state.session_id)?; + + Ok(json!({ "disabled": true })) +} + +async fn handle_stream_status(state: &DaemonState) -> Result { + Ok(current_stream_status(state).await) +} + // --------------------------------------------------------------------------- // Screencast handlers // --------------------------------------------------------------------------- @@ -4313,6 +4433,7 @@ async fn handle_screencast_start(cmd: &Value, state: &mut DaemonState) -> Result state.screencasting = true; if let Some(ref server) = state.stream_server { + server.set_screencasting(true).await; server.broadcast_status(true, true, max_width as u32, max_height as u32); } @@ -4331,6 +4452,7 @@ async fn handle_screencast_stop(state: &mut DaemonState) -> Result PathBuf { + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock should be after unix epoch") + .as_nanos(); + std::env::temp_dir().join(format!( + "agent-browser-{label}-{}-{nanos}", + std::process::id() + )) + } + + #[tokio::test] + async fn test_stream_enable_disable_and_status_without_browser() { + let guard = EnvGuard::new(&["AGENT_BROWSER_SOCKET_DIR", "AGENT_BROWSER_SESSION"]); + let socket_dir = unique_socket_dir("stream-runtime"); + fs::create_dir_all(&socket_dir).expect("socket dir should be created"); + guard.set( + "AGENT_BROWSER_SOCKET_DIR", + socket_dir.to_str().expect("socket dir should be utf-8"), + ); + guard.set("AGENT_BROWSER_SESSION", "stream-runtime-session"); + + let mut state = DaemonState::new(); + + let disabled_status = handle_stream_status(&state) + .await + .expect("status should work before enable"); + assert_eq!(disabled_status["enabled"], false); + assert_eq!(disabled_status["port"], Value::Null); + assert_eq!(disabled_status["connected"], false); + assert_eq!(disabled_status["screencasting"], false); + + let enabled_status = handle_stream_enable(&json!({ "port": 0 }), &mut state) + .await + .expect("stream enable should succeed"); + let port = enabled_status["port"] + .as_u64() + .expect("runtime stream should report a bound port"); + assert!(port > 0, "runtime stream should bind a non-zero port"); + assert_eq!(enabled_status["enabled"], true); + assert_eq!(enabled_status["connected"], false); + assert_eq!(enabled_status["screencasting"], false); + + let stream_path = socket_dir.join("stream-runtime-session.stream"); + let port_file = + fs::read_to_string(&stream_path).expect("stream metadata file should exist"); + assert_eq!(port_file.trim(), port.to_string()); + + let duplicate_err = handle_stream_enable(&json!({}), &mut state) + .await + .expect_err("duplicate enable should fail"); + assert!(duplicate_err.contains("already enabled")); + + let status = handle_stream_status(&state) + .await + .expect("status should work after enable"); + assert_eq!(status["enabled"], true); + assert_eq!(status["port"], port); + + let disabled = handle_stream_disable(&mut state) + .await + .expect("stream disable should succeed"); + assert_eq!(disabled["disabled"], true); + assert!( + !stream_path.exists(), + "disabling runtime stream should remove the metadata file" + ); + assert!(state.stream_server.is_none()); + assert!(state.stream_client.is_none()); + + let final_status = handle_stream_status(&state) + .await + .expect("status should work after disable"); + assert_eq!(final_status["enabled"], false); + assert_eq!(final_status["port"], Value::Null); + + let disable_err = handle_stream_disable(&mut state) + .await + .expect_err("duplicate disable should fail"); + assert!(disable_err.contains("not enabled")); + + let _ = fs::remove_dir_all(&socket_dir); + } + + #[tokio::test] + async fn test_stream_disable_preserves_existing_screencast_state() { + let guard = EnvGuard::new(&["AGENT_BROWSER_SOCKET_DIR", "AGENT_BROWSER_SESSION"]); + let socket_dir = unique_socket_dir("stream-preserve-screencast"); + fs::create_dir_all(&socket_dir).expect("socket dir should be created"); + guard.set( + "AGENT_BROWSER_SOCKET_DIR", + socket_dir.to_str().expect("socket dir should be utf-8"), + ); + guard.set( + "AGENT_BROWSER_SESSION", + "stream-preserve-screencast-session", + ); + + let mut state = DaemonState::new(); + handle_stream_enable(&json!({ "port": 0 }), &mut state) + .await + .expect("stream enable should succeed"); + state.screencasting = true; + + let disabled = handle_stream_disable(&mut state) + .await + .expect("stream disable should succeed"); + assert_eq!(disabled["disabled"], true); + assert!( + state.screencasting, + "stream disable should not clear an independently managed screencast state" + ); + + let _ = fs::remove_dir_all(&socket_dir); + } + + #[tokio::test] + async fn test_stream_disable_clears_state_when_stream_file_removal_fails() { + let guard = EnvGuard::new(&["AGENT_BROWSER_SOCKET_DIR", "AGENT_BROWSER_SESSION"]); + let socket_dir = unique_socket_dir("stream-disable-cleanup"); + fs::create_dir_all(&socket_dir).expect("socket dir should be created"); + guard.set( + "AGENT_BROWSER_SOCKET_DIR", + socket_dir.to_str().expect("socket dir should be utf-8"), + ); + guard.set("AGENT_BROWSER_SESSION", "stream-disable-cleanup-session"); + + let mut state = DaemonState::new(); + handle_stream_enable(&json!({ "port": 0 }), &mut state) + .await + .expect("stream enable should succeed"); + + let stream_path = socket_dir.join("stream-disable-cleanup-session.stream"); + fs::remove_file(&stream_path).expect("stream metadata file should exist"); + fs::create_dir(&stream_path).expect("directory should force remove_stream_file failure"); + + let err = handle_stream_disable(&mut state) + .await + .expect_err("stream disable should surface file removal failure"); + assert!(err.contains("Failed to remove stream metadata")); + assert!( + state.stream_server.is_none(), + "stream disable should clear stream_server even when metadata cleanup fails" + ); + assert!( + state.stream_client.is_none(), + "stream disable should clear stream_client even when metadata cleanup fails" + ); + + let _ = fs::remove_dir_all(&socket_dir); + } + + #[tokio::test] + async fn test_stream_enable_port_conflict_returns_error() { + let guard = EnvGuard::new(&["AGENT_BROWSER_SOCKET_DIR", "AGENT_BROWSER_SESSION"]); + let socket_dir = unique_socket_dir("stream-port-conflict"); + fs::create_dir_all(&socket_dir).expect("socket dir should be created"); + guard.set( + "AGENT_BROWSER_SOCKET_DIR", + socket_dir.to_str().expect("socket dir should be utf-8"), + ); + guard.set("AGENT_BROWSER_SESSION", "stream-port-conflict-session"); + + let listener = std::net::TcpListener::bind("127.0.0.1:0") + .expect("test should reserve an ephemeral port"); + let port = listener + .local_addr() + .expect("listener should have local addr") + .port(); + + let mut state = DaemonState::new(); + let err = handle_stream_enable(&json!({ "port": port }), &mut state) + .await + .expect_err("conflicting port should fail"); + assert!(err.contains("Failed to bind stream server")); + assert!(state.stream_server.is_none()); + assert!(state.stream_client.is_none()); + assert!( + !socket_dir + .join("stream-port-conflict-session.stream") + .exists(), + "failed enable should not leave stale metadata behind" + ); + + drop(listener); + let _ = fs::remove_dir_all(&socket_dir); + } + #[test] fn test_success_response_structure() { let resp = success_response("cmd-1", json!({"url": "https://example.com"})); @@ -7032,6 +7342,15 @@ mod tests { #[tokio::test] async fn test_daemon_state_new() { + let guard = EnvGuard::new(&[ + "AGENT_BROWSER_ALLOWED_DOMAINS", + "AGENT_BROWSER_SESSION_NAME", + "AGENT_BROWSER_SESSION", + ]); + guard.remove("AGENT_BROWSER_ALLOWED_DOMAINS"); + guard.remove("AGENT_BROWSER_SESSION_NAME"); + guard.remove("AGENT_BROWSER_SESSION"); + let state = DaemonState::new(); assert!(state.browser.is_none()); assert!(state.domain_filter.read().await.is_none()); @@ -7448,6 +7767,7 @@ mod tests { } #[tokio::test] + #[allow(clippy::await_holding_lock)] async fn test_credentials_roundtrip_via_actions() { let _lock = crate::native::auth::AUTH_TEST_MUTEX.lock().unwrap(); let key_var = "AGENT_BROWSER_ENCRYPTION_KEY"; diff --git a/cli/src/native/cdp/chrome.rs b/cli/src/native/cdp/chrome.rs index a7d5db186..d454e18b9 100644 --- a/cli/src/native/cdp/chrome.rs +++ b/cli/src/native/cdp/chrome.rs @@ -850,8 +850,22 @@ mod tests { #[test] fn test_find_playwright_chromium_nonexistent() { - let _guard = EnvGuard::new(&["PLAYWRIGHT_BROWSERS_PATH"]); - _guard.set("PLAYWRIGHT_BROWSERS_PATH", "/nonexistent/path"); + let guard = EnvGuard::new(&["PLAYWRIGHT_BROWSERS_PATH", "HOME", "USERPROFILE"]); + guard.set("PLAYWRIGHT_BROWSERS_PATH", "/nonexistent/path"); + + let temp_home = std::env::temp_dir().join(format!( + "agent-browser-test-home-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock should be after unix epoch") + .as_nanos() + )); + std::fs::create_dir_all(&temp_home).expect("temp home should be created"); + let temp_home = temp_home.to_string_lossy().to_string(); + guard.set("HOME", &temp_home); + guard.set("USERPROFILE", &temp_home); + let result = find_playwright_chromium(); assert!(result.is_none()); } diff --git a/cli/src/native/daemon.rs b/cli/src/native/daemon.rs index 3d2b2d8c8..ab4f65784 100644 --- a/cli/src/native/daemon.rs +++ b/cli/src/native/daemon.rs @@ -31,6 +31,9 @@ pub async fn run_daemon(session: &str) { let _ = fs::remove_file(&socket_path); } + let stream_path = socket_dir.join(format!("{}.stream", session)); + let _ = fs::remove_file(&stream_path); + if let Ok(days_str) = env::var("AGENT_BROWSER_STATE_EXPIRE_DAYS") { if let Ok(days) = days_str.parse::() { if days > 0 { @@ -47,7 +50,6 @@ pub async fn run_daemon(session: &str) { match StreamServer::start_without_client(port, session.to_string()).await { Ok((stream_server, client_slot)) => { stream_client = Some(client_slot.clone()); - let stream_path = socket_dir.join(format!("{}.stream", session)); if let Err(e) = fs::write(&stream_path, stream_server.port().to_string()) { let _ = writeln!(std::io::stderr(), "Failed to write .stream file: {}", e); @@ -80,7 +82,6 @@ pub async fn run_daemon(session: &str) { let _ = fs::remove_file(&socket_path); let _ = fs::remove_file(&pid_path); - let stream_path = socket_dir.join(format!("{}.stream", session)); let _ = fs::remove_file(&stream_path); if let Err(e) = result { diff --git a/cli/src/native/e2e_tests.rs b/cli/src/native/e2e_tests.rs index e4192a9bb..9ef464da9 100644 --- a/cli/src/native/e2e_tests.rs +++ b/cli/src/native/e2e_tests.rs @@ -8,9 +8,12 @@ //! cargo test e2e -- --ignored --test-threads=1 use base64::{engine::general_purpose::STANDARD, Engine}; +use futures_util::StreamExt; use serde_json::{json, Value}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use crate::test_utils::EnvGuard; + use super::actions::{execute_command, DaemonState}; fn assert_success(resp: &Value) { @@ -190,6 +193,127 @@ async fn e2e_lightpanda_auto_launch_can_open_page() { assert_eq!(get_data(&resp)["closed"], true); } +// --------------------------------------------------------------------------- +// Runtime stream lifecycle +// --------------------------------------------------------------------------- + +#[tokio::test] +#[ignore] +async fn e2e_runtime_stream_enable_before_launch_attaches_and_disables() { + let guard = EnvGuard::new(&["AGENT_BROWSER_SOCKET_DIR", "AGENT_BROWSER_SESSION"]); + let socket_dir = std::env::temp_dir().join(format!( + "agent-browser-e2e-stream-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock should be after unix epoch") + .as_nanos() + )); + std::fs::create_dir_all(&socket_dir).expect("socket dir should be created"); + guard.set( + "AGENT_BROWSER_SOCKET_DIR", + socket_dir.to_str().expect("socket dir should be utf-8"), + ); + guard.set("AGENT_BROWSER_SESSION", "e2e-runtime-stream"); + + let mut state = DaemonState::new(); + + let resp = execute_command(&json!({ "id": "1", "action": "stream_status" }), &mut state).await; + assert_success(&resp); + assert_eq!(get_data(&resp)["enabled"], false); + + let resp = execute_command( + &json!({ "id": "2", "action": "stream_enable", "port": 0 }), + &mut state, + ) + .await; + assert_success(&resp); + let port = get_data(&resp)["port"] + .as_u64() + .expect("stream enable should report the bound port"); + assert_eq!(get_data(&resp)["connected"], false); + + let stream_path = socket_dir.join("e2e-runtime-stream.stream"); + assert!( + stream_path.exists(), + "runtime enable should create .stream metadata" + ); + + let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{port}")) + .await + .expect("websocket client should connect to runtime stream"); + + let initial = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws.next()) + .await + .expect("websocket should emit initial status") + .expect("websocket should stay open") + .expect("websocket message should be valid"); + let initial_text = initial.into_text().expect("initial message should be text"); + let initial_status: Value = + serde_json::from_str(&initial_text).expect("status JSON should parse"); + assert_eq!(initial_status["type"], "status"); + assert_eq!(initial_status["connected"], false); + + let resp = execute_command( + &json!({ "id": "3", "action": "navigate", "url": "data:text/html,

Runtime Stream

" }), + &mut state, + ) + .await; + assert_success(&resp); + + let mut observed_connected = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10); + while tokio::time::Instant::now() < deadline { + let Some(message) = tokio::time::timeout(tokio::time::Duration::from_secs(2), ws.next()) + .await + .expect("websocket should emit status after browser launch") + else { + continue; + }; + let message = message.expect("websocket message should be valid"); + if !message.is_text() { + continue; + } + let parsed: Value = + serde_json::from_str(message.to_text().expect("text message should be readable")) + .expect("runtime stream payload should be valid JSON"); + if parsed.get("type") == Some(&json!("status")) + && parsed.get("connected") == Some(&json!(true)) + { + observed_connected = true; + break; + } + } + assert!( + observed_connected, + "runtime stream should report connected=true after browser launch" + ); + + let resp = execute_command( + &json!({ "id": "4", "action": "stream_disable" }), + &mut state, + ) + .await; + assert_success(&resp); + assert_eq!(get_data(&resp)["disabled"], true); + assert!( + !stream_path.exists(), + "stream disable should remove .stream metadata" + ); + + let close_message = tokio::time::timeout(tokio::time::Duration::from_secs(5), ws.next()) + .await + .expect("websocket should close after disable"); + assert!( + close_message.is_none() || close_message.expect("ws result should exist").is_ok(), + "websocket should shut down cleanly when the runtime stream is disabled" + ); + + let resp = execute_command(&json!({ "id": "99", "action": "close" }), &mut state).await; + assert_success(&resp); + let _ = std::fs::remove_dir_all(&socket_dir); +} + // --------------------------------------------------------------------------- // Snapshot with refs and ref-based click // --------------------------------------------------------------------------- diff --git a/cli/src/native/stream.rs b/cli/src/native/stream.rs index 9c84e6708..7e50a7d8d 100644 --- a/cli/src/native/stream.rs +++ b/cli/src/native/stream.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures_util::{SinkExt, StreamExt}; use tokio::net::TcpListener; -use tokio::sync::{broadcast, Mutex, Notify, RwLock}; +use tokio::sync::{broadcast, watch, Mutex, Notify, RwLock}; use tokio_tungstenite::tungstenite::Message; use super::cdp::client::CdpClient; @@ -46,6 +46,9 @@ pub struct StreamServer { screencasting: Arc>, viewport_width: Arc>, viewport_height: Arc>, + shutdown_tx: watch::Sender, + accept_task: Mutex>>, + cdp_task: Mutex>>, } impl StreamServer { @@ -99,6 +102,24 @@ impl StreamServer { (w, h) } + /// Override the cached screencast state for explicit CLI start/stop commands. + pub async fn set_screencasting(&self, active: bool) { + let mut guard = self.screencasting.lock().await; + *guard = active; + } + + /// Shut down the accept loop and background CDP listener, releasing the bound port. + pub async fn shutdown(&self) { + let _ = self.shutdown_tx.send(true); + + if let Some(task) = self.accept_task.lock().await.take() { + let _ = task.await; + } + if let Some(task) = self.cdp_task.lock().await.take() { + let _ = task.await; + } + } + async fn start_inner( preferred_port: u16, client_slot: Arc>>>, @@ -121,6 +142,7 @@ impl StreamServer { let cdp_session_id = Arc::new(RwLock::new(None::)); let viewport_width = Arc::new(Mutex::new(1280u32)); let viewport_height = Arc::new(Mutex::new(720u32)); + let (shutdown_tx, shutdown_rx) = watch::channel(false); let frame_tx_clone = frame_tx.clone(); let client_count_clone = client_count.clone(); @@ -132,7 +154,8 @@ impl StreamServer { // WebSocket accept loop let vw_clone = viewport_width.clone(); let vh_clone = viewport_height.clone(); - tokio::spawn(async move { + let accept_shutdown_rx = shutdown_rx.clone(); + let accept_task = tokio::spawn(async move { accept_loop( listener, frame_tx_clone, @@ -143,6 +166,7 @@ impl StreamServer { cdp_session_clone, vw_clone, vh_clone, + accept_shutdown_rx, ) .await; }); @@ -156,7 +180,7 @@ impl StreamServer { let cdp_session_bg = cdp_session_id.clone(); let vw_bg = viewport_width.clone(); let vh_bg = viewport_height.clone(); - tokio::spawn(async move { + let cdp_task = tokio::spawn(async move { cdp_event_loop( frame_tx_bg, client_slot_bg, @@ -166,6 +190,7 @@ impl StreamServer { cdp_session_bg, vw_bg, vh_bg, + shutdown_rx, ) .await; }); @@ -181,6 +206,9 @@ impl StreamServer { screencasting, viewport_width, viewport_height, + shutdown_tx, + accept_task: Mutex::new(Some(accept_task)), + cdp_task: Mutex::new(Some(cdp_task)), }, client_slot, )) @@ -252,32 +280,47 @@ async fn accept_loop( cdp_session_id: Arc>>, viewport_width: Arc>, viewport_height: Arc>, + mut shutdown_rx: watch::Receiver, ) { - while let Ok((stream, addr)) = listener.accept().await { - let frame_rx = frame_tx.subscribe(); - let client_count = client_count.clone(); - let client_slot = client_slot.clone(); - let client_notify = client_notify.clone(); - let screencasting = screencasting.clone(); - let cdp_session_id = cdp_session_id.clone(); - let vw = viewport_width.clone(); - let vh = viewport_height.clone(); - - tokio::spawn(async move { - handle_ws_client( - stream, - addr, - frame_rx, - client_count, - client_slot, - client_notify, - screencasting, - cdp_session_id, - vw, - vh, - ) - .await; - }); + loop { + tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow() { + break; + } + } + accept_result = listener.accept() => { + let Ok((stream, addr)) = accept_result else { + break; + }; + let frame_rx = frame_tx.subscribe(); + let client_count = client_count.clone(); + let client_slot = client_slot.clone(); + let client_notify = client_notify.clone(); + let screencasting = screencasting.clone(); + let cdp_session_id = cdp_session_id.clone(); + let vw = viewport_width.clone(); + let vh = viewport_height.clone(); + let shutdown_rx = shutdown_rx.clone(); + + tokio::spawn(async move { + handle_ws_client( + stream, + addr, + frame_rx, + client_count, + client_slot, + client_notify, + screencasting, + cdp_session_id, + vw, + vh, + shutdown_rx, + ) + .await; + }); + } + } } } @@ -293,6 +336,7 @@ async fn handle_ws_client( cdp_session_id: Arc>>, viewport_width: Arc>, viewport_height: Arc>, + mut shutdown_rx: watch::Receiver, ) { let callback = |req: &tokio_tungstenite::tungstenite::handshake::server::Request, @@ -347,6 +391,12 @@ async fn handle_ws_client( loop { tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow() { + let _ = ws_tx.send(Message::Close(None)).await; + break; + } + } frame = frame_rx.recv() => { match frame { Ok(data) => { @@ -398,10 +448,28 @@ async fn cdp_event_loop( cdp_session_id: Arc>>, viewport_width: Arc>, viewport_height: Arc>, + mut shutdown_rx: watch::Receiver, ) { loop { // Wait until we're notified of a client/connection change - client_notify.notified().await; + tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow() { + let session_id = cdp_session_id.read().await.clone(); + if *screencasting.lock().await { + if let Some(ref client) = *client_slot.read().await { + let _ = client + .send_command_no_params("Page.stopScreencast", session_id.as_deref()) + .await; + } + let mut sc = screencasting.lock().await; + *sc = false; + } + return; + } + } + _ = client_notify.notified() => {} + } // Check if we have WS clients and a CDP client let count = *client_count.lock().await; @@ -453,6 +521,17 @@ async fn cdp_event_loop( // Process CDP events in real-time until client disconnects or CDP closes loop { tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow() { + let session_id = cdp_session_id.read().await.clone(); + let _ = client_arc + .send_command_no_params("Page.stopScreencast", session_id.as_deref()) + .await; + let mut sc = screencasting.lock().await; + *sc = false; + return; + } + } event = event_rx.recv() => { match event { Ok(evt) => { diff --git a/cli/src/output.rs b/cli/src/output.rs index 5c275e643..f0cf9ab2b 100644 --- a/cli/src/output.rs +++ b/cli/src/output.rs @@ -99,6 +99,37 @@ fn format_storage_text(data: &serde_json::Value) -> Option { Some(format!("{}: {}", key, format_storage_value(value))) } +fn format_stream_status_text(action: Option<&str>, data: &serde_json::Value) -> Option { + match action { + Some("stream_disable") => data + .get("disabled") + .and_then(|v| v.as_bool()) + .filter(|disabled| *disabled) + .map(|_| "Streaming disabled".to_string()), + Some("stream_enable") | Some("stream_status") => { + let enabled = data.get("enabled").and_then(|v| v.as_bool())?; + if !enabled { + return Some("Streaming disabled".to_string()); + } + + let port = data.get("port").and_then(|v| v.as_u64())?; + let connected = data + .get("connected") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let screencasting = data + .get("screencasting") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + Some(format!( + "Streaming enabled on ws://127.0.0.1:{port}\nConnected: {connected}\nScreencasting: {screencasting}" + )) + } + _ => None, + } +} + pub fn print_response_with_opts(resp: &Response, action: Option<&str>, opts: &OutputOptions) { if opts.json { if opts.content_boundaries { @@ -168,6 +199,10 @@ pub fn print_response_with_opts(resp: &Response, action: Option<&str>, opts: &Ou return; } } + if let Some(output) = format_stream_status_text(action, data) { + println!("{}", output); + return; + } if action == Some("storage_get") { if let Some(output) = format_storage_text(data) { println!("{}", output); @@ -2393,6 +2428,39 @@ Examples: "## } + // === Runtime streaming === + "stream" => { + r##" +agent-browser stream - Manage live WebSocket browser streaming + +Usage: + agent-browser stream enable [--port ] + agent-browser stream disable + agent-browser stream status + +Enables or disables the session-scoped WebSocket stream server without restarting +an already-running daemon. If --port is omitted, agent-browser binds an +available localhost port automatically and reports it back. + +Notes: + - 'stream enable' creates the WebSocket server. + - WebSocket clients trigger frame streaming automatically. + - 'screencast_start' and 'screencast_stop' still control explicit CDP screencasts. + - AGENT_BROWSER_STREAM_PORT only affects daemon startup; use 'stream enable' + for sessions that are already running. + +Global Options: + --json Output as JSON + --session Use specific session + +Examples: + agent-browser stream status + agent-browser stream enable + agent-browser stream enable --port 9223 + agent-browser stream disable +"## + } + // === iOS Commands === "tap" => { r##" @@ -2637,6 +2705,11 @@ Debug: inspect Open Chrome DevTools for the active page clipboard [text] Read/write clipboard (read, write, copy, paste) +Streaming: + stream enable [--port ] Start runtime WebSocket streaming for this session + stream disable Stop runtime WebSocket streaming + stream status Show streaming status and active port + Batch: batch [--bail] Execute commands from stdin (JSON array of string arrays) --bail stops on first error (default: continue all) @@ -2790,6 +2863,8 @@ Examples: agent-browser wait --load networkidle # Wait for slow pages to load agent-browser --cdp 9222 snapshot # Connect via CDP port agent-browser --auto-connect snapshot # Auto-discover running Chrome + agent-browser stream enable # Start runtime streaming on an auto-selected port + agent-browser stream status # Inspect runtime streaming state agent-browser --color-scheme dark open example.com # Dark mode agent-browser --profile ~/.myapp open example.com # Persistent profile agent-browser --session-name myapp open example.com # Auto-save/restore state @@ -2896,6 +2971,33 @@ mod tests { use super::format_storage_text; use serde_json::json; + #[test] + fn test_format_stream_status_text_for_enabled_stream() { + let data = json!({ + "enabled": true, + "port": 9223, + "connected": true, + "screencasting": false + }); + + let rendered = super::format_stream_status_text(Some("stream_status"), &data).unwrap(); + + assert_eq!( + rendered, + "Streaming enabled on ws://127.0.0.1:9223\nConnected: true\nScreencasting: false" + ); + } + + #[test] + fn test_format_stream_status_text_for_disabled_stream() { + let data = + json!({ "enabled": false, "port": null, "connected": false, "screencasting": false }); + + let rendered = super::format_stream_status_text(Some("stream_status"), &data).unwrap(); + + assert_eq!(rendered, "Streaming disabled"); + } + #[test] fn test_format_storage_text_for_all_entries() { let data = json!({ diff --git a/docs/src/app/commands/page.mdx b/docs/src/app/commands/page.mdx index cbc6e34f6..dab13adcd 100644 --- a/docs/src/app/commands/page.mdx +++ b/docs/src/app/commands/page.mdx @@ -30,6 +30,9 @@ agent-browser pdf # Save page as PDF agent-browser snapshot # Accessibility tree with refs agent-browser eval # Run JavaScript agent-browser connect # Connect to browser via CDP +agent-browser stream enable [--port ] # Start runtime WebSocket streaming +agent-browser stream status # Show runtime streaming state and bound port +agent-browser stream disable # Stop runtime WebSocket streaming agent-browser close # Close browser (aliases: quit, exit) ``` @@ -228,6 +231,17 @@ agent-browser dialog status # Check if a dialog is currently open When a JavaScript dialog (`alert`, `confirm`, `prompt`) is pending, all command responses include a `warning` field with the dialog type and message. +## Streaming + +```bash +agent-browser stream enable # Start runtime WebSocket streaming on an auto-selected port +agent-browser stream enable --port 9223 # Bind a specific localhost port +agent-browser stream status # Show enabled state, port, browser connection, screencasting +agent-browser stream disable # Stop runtime streaming and remove the .stream metadata file +``` + +Use `stream enable` for sessions that are already running. If you need streaming from daemon startup, set `AGENT_BROWSER_STREAM_PORT` before the first command in that session. + ## Debug ```bash diff --git a/docs/src/app/configuration/page.mdx b/docs/src/app/configuration/page.mdx index e023ea76f..fd612717b 100644 --- a/docs/src/app/configuration/page.mdx +++ b/docs/src/app/configuration/page.mdx @@ -173,7 +173,7 @@ These environment variables configure additional daemon and runtime behavior: AGENT_BROWSER_ENCRYPTION_KEY64-char hex key for AES-256-GCM session encryption.(none) AGENT_BROWSER_EXTENSIONSComma-separated browser extension paths. Extensions work in both headed and headless mode.(none) AGENT_BROWSER_HEADEDShow browser window instead of running headless (1 to enable).(disabled) - AGENT_BROWSER_STREAM_PORTEnable WebSocket streaming on the specified port (e.g., 9223).(disabled) + AGENT_BROWSER_STREAM_PORTEnable WebSocket streaming at daemon startup on the specified port (e.g., 9223). For an already-running session, use agent-browser stream enable.(disabled) AGENT_BROWSER_IDLE_TIMEOUT_MSAuto-shutdown the daemon after N ms of inactivity (no commands received). Useful for ephemeral environments.(disabled) AGENT_BROWSER_IOS_DEVICEDefault iOS device name for the ios provider.(none) AGENT_BROWSER_IOS_UDIDDefault iOS device UDID for the ios provider.(none) diff --git a/docs/src/app/streaming/page.mdx b/docs/src/app/streaming/page.mdx index 63830aaac..221148dac 100644 --- a/docs/src/app/streaming/page.mdx +++ b/docs/src/app/streaming/page.mdx @@ -5,14 +5,46 @@ where a human can watch and interact alongside an AI agent. ## Enable streaming -Set the `AGENT_BROWSER_STREAM_PORT` environment variable to start -a WebSocket server: +For an already-running session, enable streaming at runtime: + +```bash +agent-browser stream enable +agent-browser stream status +agent-browser stream disable +``` + +`stream enable` binds an available localhost port automatically unless you pass `--port `. `stream status` returns the enabled state, active port, browser connection state, and whether screencasting is active. `stream disable` tears the server down and removes the session's `.stream` metadata file. + +If you want the WebSocket server to exist from daemon startup, set `AGENT_BROWSER_STREAM_PORT` before the first command in that session: ```bash AGENT_BROWSER_STREAM_PORT=9223 agent-browser open example.com ``` -The server streams viewport frames and accepts input events (mouse, keyboard, touch). +The environment variable only affects daemon startup. For sessions that are already running, use `agent-browser stream enable` instead. + +Once enabled, the server streams viewport frames and accepts input events (mouse, keyboard, touch). + +## Runtime status response + +`agent-browser stream status --json` returns data like: + +```json +{ + "enabled": true, + "port": 9223, + "connected": true, + "screencasting": true +} +``` + +`connected` reports whether the daemon currently has a browser attached. `screencasting` reports whether frames are actively being produced for the stream server. + +## Relationship to screencast commands + +`stream enable` creates the WebSocket server and keeps it available for the session. WebSocket clients then trigger live frame delivery automatically. + +The lower-level `screencast_start` and `screencast_stop` commands still control explicit CDP screencasts directly. Use them when you want a screencast without the WebSocket runtime server. ## WebSocket protocol diff --git a/skills/agent-browser/SKILL.md b/skills/agent-browser/SKILL.md index ba9cb2067..0b86ca298 100644 --- a/skills/agent-browser/SKILL.md +++ b/skills/agent-browser/SKILL.md @@ -171,6 +171,12 @@ agent-browser screenshot --screenshot-dir ./shots # Save to custom directory agent-browser screenshot --screenshot-format jpeg --screenshot-quality 80 agent-browser pdf output.pdf # Save as PDF +# Live preview / streaming +agent-browser stream enable # Start runtime WebSocket streaming on an auto-selected port +agent-browser stream enable --port 9223 # Bind a specific localhost port +agent-browser stream status # Inspect enabled state, port, connection, and screencasting +agent-browser stream disable # Stop runtime streaming and remove the .stream metadata file + # Clipboard agent-browser clipboard read # Read text from clipboard agent-browser clipboard write "Hello, World!" # Write text to clipboard @@ -192,6 +198,12 @@ agent-browser diff url --wait-until networkidle # Custom wait str agent-browser diff url --selector "#main" # Scope to element ``` +## Runtime Streaming + +Use `agent-browser stream enable` when you need a live WebSocket preview for an already-running session. This is the preferred runtime path because it does not require restarting the daemon. `stream enable` creates the server, `stream status` reports the bound port and connection state, and `stream disable` tears it down cleanly. + +If streaming must be present from the first daemon command, `AGENT_BROWSER_STREAM_PORT` still works at daemon startup, but that environment variable is not retroactive for sessions that are already running. + ## Batch Execution Execute multiple commands in a single invocation by piping a JSON array of string arrays to `batch`. This avoids per-command process startup overhead when running multi-step workflows.