Skip to content

Commit 1d3ff92

Browse files
committed
refactor(server): use ComputeDriver RPC surface in-process
Signed-off-by: Drew Newberry <anewberry@nvidia.com>
1 parent 355d845 commit 1d3ff92

File tree

3 files changed

+546
-199
lines changed

3 files changed

+546
-199
lines changed

architecture/gateway.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ The gateway boots in `main()` (`crates/openshell-server/src/main.rs`) and procee
9696
4. **Build `Config`** -- Assembles a `openshell_core::Config` from the parsed arguments.
9797
5. **Call `run_server()`** (`crates/openshell-server/src/lib.rs`):
9898
1. Connect to the persistence store (`Store::connect`), which auto-detects SQLite vs Postgres from the URL prefix and runs migrations.
99-
2. Create `ComputeRuntime` with the in-process Kubernetes compute backend (`KubernetesComputeDriver`).
99+
2. Create `ComputeRuntime` with an in-process `ComputeDriverService` backed by `KubernetesComputeDriver`, so the gateway calls the `openshell.compute.v1.ComputeDriver` RPC surface even without transport.
100100
3. Build `ServerState` (shared via `Arc<ServerState>` across all handlers).
101101
4. **Spawn background tasks**:
102-
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, updates persisted sandbox records, and republishes platform events.
102+
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, republishes platform events, and runs a periodic `ListSandboxes` snapshot reconcile so the store-backed public sandbox reads stay aligned with the compute driver.
103103
5. Create `MultiplexService`.
104104
6. Bind `TcpListener` on `config.bind_address`.
105105
7. Optionally create `TlsAcceptor` from cert/key files.
@@ -149,7 +149,7 @@ pub struct ServerState {
149149
```
150150

151151
- **`store`** -- persistence backend (SQLite or Postgres) for all object types.
152-
- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles orphaned `Provisioning` records that no longer have a backing compute resource.
152+
- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles the store against `ComputeDriver/ListSandboxes` snapshots.
153153
- **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Updated from compute-driver sandbox snapshots.
154154
- **`sandbox_watch_bus`** -- `broadcast`-based notification bus keyed by sandbox ID. Producers call `notify(&id)` when the persisted sandbox record changes; consumers in `WatchSandbox` streams receive `()` signals and re-read the record.
155155
- **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for compute-driver platform events.
@@ -381,7 +381,7 @@ All buses use `tokio::sync::broadcast` channels keyed by sandbox ID. Buffer size
381381

382382
Broadcast lag is translated to `Status::resource_exhausted` via `broadcast_to_status()`.
383383

384-
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic orphan sweep for stale `Provisioning` records, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
384+
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic snapshot reconcile for sandboxes missing from the driver, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
385385

386386
**Validation:** `WatchSandbox` validates that the sandbox exists before subscribing to any bus, preventing entries from being created for non-existent IDs. `PushSandboxLogs` validates sandbox existence once on the first batch of the stream.
387387

@@ -500,7 +500,7 @@ The Helm chart template is at `deploy/helm/openshell/templates/statefulset.yaml`
500500

501501
### Sandbox CRD Management
502502

503-
`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface.
503+
`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface. The gateway binds to that driver through `ComputeDriverService` (`crates/openshell-driver-kubernetes/src/grpc.rs`) in-process, so the same `openshell.compute.v1.ComputeDriver` request and response types are exercised whether the driver is invoked locally or served over gRPC.
504504

505505
- **Get**: `GetSandbox` looks up a sandbox CRD by name and returns a driver-native platform observation (`openshell.compute.v1.DriverSandbox`) with raw status and condition data from the object.
506506
- **List**: `ListSandboxes` enumerates sandbox CRDs and returns driver-native platform observations for each, sorted by name for stable results.
@@ -517,6 +517,8 @@ The Kubernetes driver emits `WatchSandboxes` events through `proto/compute_drive
517517
- **Deleted**: Removes the sandbox record from the store and the index. Notifies the watch bus.
518518
- **Restarted**: Re-processes all objects (full resync).
519519

520+
In addition to the watch stream, `ComputeRuntime` periodically calls `ComputeDriver/ListSandboxes` through the in-process `ComputeDriverService` and reconciles the store to that full driver snapshot. Public `GetSandbox` and `ListSandboxes` handlers remain store-backed, but the store is refreshed from the driver on a timer so the gateway still exercises the compute-driver RPC surface for reconciliation.
521+
520522
### Gateway Phase Derivation
521523

522524
`ComputeRuntime::derive_phase()` (`crates/openshell-server/src/compute/mod.rs`) maps driver-native compute status to the public `SandboxPhase` exposed by `proto/openshell.proto`:

crates/openshell-driver-kubernetes/src/grpc.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl ComputeDriver for ComputeDriverService {
102102
self.driver
103103
.create_sandbox(&sandbox)
104104
.await
105-
.map_err(|err| Status::internal(err.to_string()))?;
105+
.map_err(status_from_driver_error)?;
106106
Ok(Response::new(CreateSandboxResponse {}))
107107
}
108108

@@ -181,4 +181,12 @@ mod tests {
181181
assert_eq!(status.code(), tonic::Code::FailedPrecondition);
182182
assert_eq!(status.message(), "sandbox agent pod IP is not available");
183183
}
184+
185+
#[test]
186+
fn already_exists_driver_errors_map_to_already_exists_status() {
187+
let status = status_from_driver_error(KubernetesDriverError::AlreadyExists);
188+
189+
assert_eq!(status.code(), tonic::Code::AlreadyExists);
190+
assert_eq!(status.message(), "sandbox already exists");
191+
}
184192
}

0 commit comments

Comments
 (0)