Skip to content

Commit 8a14f9e

Browse files
authored
chore: Remove DistributedRuntime::etcd_client (#4489)
Signed-off-by: Graham King <grahamk@nvidia.com>
1 parent d1ce423 commit 8a14f9e

File tree

4 files changed

+45
-39
lines changed

4 files changed

+45
-39
lines changed

lib/bindings/python/rust/planner.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use parking_lot::Mutex;
1414
use pyo3::{exceptions::PyException, prelude::*};
1515

1616
use super::to_pyerr;
17-
use dynamo_runtime::CancellationToken;
18-
use dynamo_runtime::transports::etcd::{Client, KvCache};
17+
use dynamo_runtime::transports::etcd::{self, Client, KvCache};
18+
use tokio_util::sync::CancellationToken;
1919

2020
// All three AI's I asked agreed, this is the way
2121
const NONE_SENTINEL: usize = usize::MAX;
@@ -45,32 +45,39 @@ pub struct VirtualConnectorCoordinator(Arc<InnerConnector>);
4545
impl VirtualConnectorCoordinator {
4646
#[new]
4747
pub fn new(
48-
runtime: super::DistributedRuntime,
48+
drt: super::DistributedRuntime,
4949
dynamo_namespace: &str,
5050
check_interval_secs: usize,
5151
max_wait_time_secs: usize,
5252
max_retries: usize,
53-
) -> Self {
53+
) -> PyResult<Self> {
5454
let check_interval = Duration::from_secs(check_interval_secs as u64);
5555
let max_wait_time = Duration::from_secs(max_wait_time_secs as u64);
56+
// default reads from environment variables
57+
let etcd_config = etcd::ClientOptions::default();
58+
// etcd client construction is async, but async python constructors are not allowed
59+
let etcd_client = drt
60+
.inner
61+
.runtime()
62+
.secondary()
63+
.block_on(
64+
async move { etcd::Client::new(etcd_config, drt.inner.runtime().clone()).await },
65+
)
66+
.map_err(to_pyerr)?;
5667

5768
let c = InnerConnector {
5869
check_interval,
5970
max_wait_time,
6071
max_retries,
6172
namespace: dynamo_namespace.to_string(),
62-
etcd_client: runtime
63-
.inner()
64-
.etcd_client()
65-
.expect("Planner cannot run without etcd / in static mode"),
66-
73+
etcd_client,
6774
kv_cache: Mutex::new(None),
6875
num_prefill_workers: AtomicUsize::new(NONE_SENTINEL),
6976
num_decode_workers: AtomicUsize::new(NONE_SENTINEL),
7077
decision_id: AtomicUsize::new(NONE_SENTINEL),
7178
first_skip_timestamp: AtomicUsize::new(NONE_SENTINEL),
7279
};
73-
Self(Arc::new(c))
80+
Ok(Self(Arc::new(c)))
7481
}
7582

7683
#[pyo3(signature = ())]
@@ -365,16 +372,24 @@ pub struct VirtualConnectorClient(Arc<InnerClient>);
365372
#[pymethods]
366373
impl VirtualConnectorClient {
367374
#[new]
368-
pub fn new(runtime: super::DistributedRuntime, dynamo_namespace: &str) -> Self {
375+
pub fn new(drt: super::DistributedRuntime, dynamo_namespace: &str) -> PyResult<Self> {
376+
let runtime = drt.inner.runtime();
377+
let cancellation_token = runtime.child_token();
378+
// default reads from environment variables
379+
let etcd_config = etcd::ClientOptions::default();
380+
// etcd client construction is async, but async python constructors are not allowed
381+
let etcd_client = runtime
382+
.secondary()
383+
.block_on(
384+
async move { etcd::Client::new(etcd_config, drt.inner.runtime().clone()).await },
385+
)
386+
.map_err(to_pyerr)?;
369387
let c = InnerClient {
370-
etcd_client: runtime
371-
.inner
372-
.etcd_client()
373-
.expect("Planner cannot run without etcd / in static mode"),
388+
etcd_client,
374389
key: root_key(dynamo_namespace),
375-
cancellation_token: runtime.inner().child_token(),
390+
cancellation_token,
376391
};
377-
Self(Arc::new(c))
392+
Ok(Self(Arc::new(c)))
378393
}
379394

380395
/// Get the current values as a PlannerDecision

lib/runtime/src/distributed.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ pub struct DistributedRuntime {
4343
// local runtime
4444
runtime: Runtime,
4545

46-
// Unified transport manager
47-
etcd_client: Option<transports::etcd::Client>,
4846
nats_client: Option<transports::nats::Client>,
4947
store: KeyValueStoreManager,
5048
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
@@ -101,17 +99,16 @@ impl DistributedRuntime {
10199

102100
let runtime_clone = runtime.clone();
103101

104-
let (etcd_client, store) = match selected_kv_store {
102+
let store = match selected_kv_store {
105103
KeyValueStoreSelect::Etcd(etcd_config) => {
106104
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
107105
// The returned error doesn't show because of a dropped runtime error, so
108106
// log it first.
109107
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
110-
let store = KeyValueStoreManager::etcd(etcd_client.clone());
111-
(Some(etcd_client), store)
108+
KeyValueStoreManager::etcd(etcd_client)
112109
}
113-
KeyValueStoreSelect::File(root) => (None, KeyValueStoreManager::file(root)),
114-
KeyValueStoreSelect::Memory => (None, KeyValueStoreManager::memory()),
110+
KeyValueStoreSelect::File(root) => KeyValueStoreManager::file(root),
111+
KeyValueStoreSelect::Memory => KeyValueStoreManager::memory(),
115112
};
116113

117114
let nats_client = match nats_config {
@@ -176,7 +173,6 @@ impl DistributedRuntime {
176173

177174
let distributed_runtime = Self {
178175
runtime,
179-
etcd_client,
180176
store,
181177
nats_client,
182178
tcp_server: Arc::new(OnceCell::new()),
@@ -423,15 +419,7 @@ impl DistributedRuntime {
423419
self.system_status_server.get().cloned()
424420
}
425421

426-
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
427-
//
428-
// Try to use `store()` instead of this. Only use this if you have not been able to migrate
429-
// yet, or if you require etcd-specific features like distributed locking (rare).
430-
pub fn etcd_client(&self) -> Option<etcd::Client> {
431-
self.etcd_client.clone()
432-
}
433-
434-
/// An interface to store things. Will eventually replace `etcd_client`.
422+
/// An interface to store things outside of the process. Usually backed by something like etcd.
435423
/// Currently does key-value, but will grow to include whatever we need to store.
436424
pub fn store(&self) -> &KeyValueStoreManager {
437425
&self.store

lib/runtime/src/storage/key_value_store/etcd.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,7 @@ mod concurrent_create_tests {
273273
}
274274

275275
async fn test_concurrent_create(drt: DistributedRuntime) -> Result<(), StoreError> {
276-
let etcd_client = drt.etcd_client().expect("etcd client should be available");
277-
let storage = EtcdStore::new(etcd_client);
276+
let storage = drt.store();
278277

279278
// Create a bucket for testing
280279
let bucket = Arc::new(tokio::sync::Mutex::new(

lib/runtime/src/transports/etcd.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,9 @@ mod tests {
764764
let key = "__integration_test_key";
765765
let value = b"test_value";
766766

767-
let client = drt.etcd_client().expect("etcd client should be available");
767+
let client = Client::new(ClientOptions::default(), drt.runtime().clone())
768+
.await
769+
.expect("etcd client should be available");
768770
let lease_id = drt.connection_id();
769771

770772
// Create the key
@@ -804,8 +806,10 @@ mod tests {
804806
}
805807

806808
async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
807-
// Get the client and unwrap it
808-
let client = drt.etcd_client().expect("etcd client should be available");
809+
// Make the client and unwrap it
810+
let client = Client::new(ClientOptions::default(), drt.runtime().clone())
811+
.await
812+
.expect("etcd client should be available");
809813

810814
// Create a unique test prefix to avoid conflicts with other tests
811815
let test_id = uuid::Uuid::new_v4().to_string();

0 commit comments

Comments
 (0)