Skip to content

Commit e382187

Browse files
authored
Merge pull request #62 from kinode-dao/bp/timeouts
Bp/timeouts
2 parents 3232423 + e055bed commit e382187

File tree

5 files changed

+108
-55
lines changed

5 files changed

+108
-55
lines changed

src/kv.rs

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{get_blob, Message, PackageId, Request};
2-
use serde::{Deserialize, Serialize};
2+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
3+
use std::marker::PhantomData;
34
use thiserror::Error;
45

56
/// Actions are sent to a specific key value database, "db" is the name,
@@ -54,22 +55,29 @@ pub enum KvError {
5455
/// Opening or creating a kv will give you a Result<Kv>.
5556
/// You can call it's impl functions to interact with it.
5657
#[derive(Debug, Serialize, Deserialize)]
57-
pub struct Kv {
58+
pub struct Kv<K, V> {
5859
pub package_id: PackageId,
5960
pub db: String,
61+
pub timeout: u64,
62+
_marker: PhantomData<(K, V)>,
6063
}
6164

62-
impl Kv {
65+
impl<K, V> Kv<K, V>
66+
where
67+
K: Serialize + DeserializeOwned,
68+
V: Serialize + DeserializeOwned,
69+
{
6370
/// Get a value.
64-
pub fn get(&self, key: Vec<u8>) -> anyhow::Result<Vec<u8>> {
71+
pub fn get(&self, key: &K) -> anyhow::Result<V> {
72+
let key = serde_json::to_vec(key)?;
6573
let res = Request::new()
6674
.target(("our", "kv", "distro", "sys"))
6775
.body(serde_json::to_vec(&KvRequest {
6876
package_id: self.package_id.clone(),
6977
db: self.db.clone(),
7078
action: KvAction::Get { key },
7179
})?)
72-
.send_and_await_response(5)?;
80+
.send_and_await_response(self.timeout)?;
7381

7482
match res {
7583
Ok(Message::Response { body, .. }) => {
@@ -81,7 +89,9 @@ impl Kv {
8189
Some(bytes) => bytes.bytes,
8290
None => return Err(anyhow::anyhow!("kv: no blob")),
8391
};
84-
Ok(bytes)
92+
let value = serde_json::from_slice::<V>(&bytes)
93+
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?;
94+
Ok(value)
8595
}
8696
KvResponse::Err { error } => Err(error.into()),
8797
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
@@ -92,7 +102,10 @@ impl Kv {
92102
}
93103

94104
/// Set a value, optionally in a transaction.
95-
pub fn set(&self, key: Vec<u8>, value: Vec<u8>, tx_id: Option<u64>) -> anyhow::Result<()> {
105+
pub fn set(&self, key: &K, value: &V, tx_id: Option<u64>) -> anyhow::Result<()> {
106+
let key = serde_json::to_vec(key)?;
107+
let value = serde_json::to_vec(value)?;
108+
96109
let res = Request::new()
97110
.target(("our", "kv", "distro", "sys"))
98111
.body(serde_json::to_vec(&KvRequest {
@@ -101,7 +114,7 @@ impl Kv {
101114
action: KvAction::Set { key, tx_id },
102115
})?)
103116
.blob_bytes(value)
104-
.send_and_await_response(5)?;
117+
.send_and_await_response(self.timeout)?;
105118

106119
match res {
107120
Ok(Message::Response { body, .. }) => {
@@ -118,15 +131,16 @@ impl Kv {
118131
}
119132

120133
/// Delete a value, optionally in a transaction.
121-
pub fn delete(&self, key: Vec<u8>, tx_id: Option<u64>) -> anyhow::Result<()> {
134+
pub fn delete(&self, key: &K, tx_id: Option<u64>) -> anyhow::Result<()> {
135+
let key = serde_json::to_vec(key)?;
122136
let res = Request::new()
123137
.target(("our", "kv", "distro", "sys"))
124138
.body(serde_json::to_vec(&KvRequest {
125139
package_id: self.package_id.clone(),
126140
db: self.db.clone(),
127141
action: KvAction::Delete { key, tx_id },
128142
})?)
129-
.send_and_await_response(5)?;
143+
.send_and_await_response(self.timeout)?;
130144

131145
match res {
132146
Ok(Message::Response { body, .. }) => {
@@ -151,7 +165,7 @@ impl Kv {
151165
db: self.db.clone(),
152166
action: KvAction::BeginTx,
153167
})?)
154-
.send_and_await_response(5)?;
168+
.send_and_await_response(self.timeout)?;
155169

156170
match res {
157171
Ok(Message::Response { body, .. }) => {
@@ -176,7 +190,7 @@ impl Kv {
176190
db: self.db.clone(),
177191
action: KvAction::Commit { tx_id },
178192
})?)
179-
.send_and_await_response(5)?;
193+
.send_and_await_response(self.timeout)?;
180194

181195
match res {
182196
Ok(Message::Response { body, .. }) => {
@@ -194,15 +208,21 @@ impl Kv {
194208
}
195209

196210
/// Opens or creates a kv db.
197-
pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Kv> {
211+
pub fn open<K, V>(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<Kv<K, V>>
212+
where
213+
K: Serialize + DeserializeOwned,
214+
V: Serialize + DeserializeOwned,
215+
{
216+
let timeout = timeout.unwrap_or(5);
217+
198218
let res = Request::new()
199219
.target(("our", "kv", "distro", "sys"))
200220
.body(serde_json::to_vec(&KvRequest {
201221
package_id: package_id.clone(),
202222
db: db.to_string(),
203223
action: KvAction::Open,
204224
})?)
205-
.send_and_await_response(5)?;
225+
.send_and_await_response(timeout)?;
206226

207227
match res {
208228
Ok(Message::Response { body, .. }) => {
@@ -212,6 +232,8 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Kv> {
212232
KvResponse::Ok => Ok(Kv {
213233
package_id,
214234
db: db.to_string(),
235+
timeout,
236+
_marker: PhantomData,
215237
}),
216238
KvResponse::Err { error } => Err(error.into()),
217239
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
@@ -222,15 +244,17 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Kv> {
222244
}
223245

224246
/// Removes and deletes a kv db.
225-
pub fn remove_db(package_id: PackageId, db: &str) -> anyhow::Result<()> {
247+
pub fn remove_db(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<()> {
248+
let timeout = timeout.unwrap_or(5);
249+
226250
let res = Request::new()
227251
.target(("our", "kv", "distro", "sys"))
228252
.body(serde_json::to_vec(&KvRequest {
229253
package_id: package_id.clone(),
230254
db: db.to_string(),
231255
action: KvAction::RemoveDb,
232256
})?)
233-
.send_and_await_response(5)?;
257+
.send_and_await_response(timeout)?;
234258

235259
match res {
236260
Ok(Message::Response { body, .. }) => {

src/sqlite.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub enum SqliteError {
7979
pub struct Sqlite {
8080
pub package_id: PackageId,
8181
pub db: String,
82+
pub timeout: u64,
8283
}
8384

8485
impl Sqlite {
@@ -96,7 +97,7 @@ impl Sqlite {
9697
action: SqliteAction::Read { query },
9798
})?)
9899
.blob_bytes(serde_json::to_vec(&params)?)
99-
.send_and_await_response(5)?;
100+
.send_and_await_response(self.timeout)?;
100101

101102
match res {
102103
Ok(Message::Response { body, .. }) => {
@@ -141,7 +142,7 @@ impl Sqlite {
141142
action: SqliteAction::Write { statement, tx_id },
142143
})?)
143144
.blob_bytes(serde_json::to_vec(&params)?)
144-
.send_and_await_response(5)?;
145+
.send_and_await_response(self.timeout)?;
145146

146147
match res {
147148
Ok(Message::Response { body, .. }) => {
@@ -169,7 +170,7 @@ impl Sqlite {
169170
db: self.db.clone(),
170171
action: SqliteAction::BeginTx,
171172
})?)
172-
.send_and_await_response(5)?;
173+
.send_and_await_response(self.timeout)?;
173174

174175
match res {
175176
Ok(Message::Response { body, .. }) => {
@@ -197,7 +198,7 @@ impl Sqlite {
197198
db: self.db.clone(),
198199
action: SqliteAction::Commit { tx_id },
199200
})?)
200-
.send_and_await_response(5)?;
201+
.send_and_await_response(self.timeout)?;
201202

202203
match res {
203204
Ok(Message::Response { body, .. }) => {
@@ -218,15 +219,17 @@ impl Sqlite {
218219
}
219220

220221
/// Open or create sqlite database.
221-
pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Sqlite> {
222+
pub fn open(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<Sqlite> {
223+
let timeout = timeout.unwrap_or(5);
224+
222225
let res = Request::new()
223226
.target(("our", "sqlite", "distro", "sys"))
224227
.body(serde_json::to_vec(&SqliteRequest {
225228
package_id: package_id.clone(),
226229
db: db.to_string(),
227230
action: SqliteAction::Open,
228231
})?)
229-
.send_and_await_response(5)?;
232+
.send_and_await_response(timeout)?;
230233

231234
match res {
232235
Ok(Message::Response { body, .. }) => {
@@ -236,6 +239,7 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Sqlite> {
236239
SqliteResponse::Ok => Ok(Sqlite {
237240
package_id,
238241
db: db.to_string(),
242+
timeout,
239243
}),
240244
SqliteResponse::Err { error } => Err(error.into()),
241245
_ => Err(anyhow::anyhow!(
@@ -249,15 +253,17 @@ pub fn open(package_id: PackageId, db: &str) -> anyhow::Result<Sqlite> {
249253
}
250254

251255
/// Remove and delete sqlite database.
252-
pub fn remove_db(package_id: PackageId, db: &str) -> anyhow::Result<()> {
256+
pub fn remove_db(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<()> {
257+
let timeout = timeout.unwrap_or(5);
258+
253259
let res = Request::new()
254260
.target(("our", "sqlite", "distro", "sys"))
255261
.body(serde_json::to_vec(&SqliteRequest {
256262
package_id: package_id.clone(),
257263
db: db.to_string(),
258264
action: SqliteAction::RemoveDb,
259265
})?)
260-
.send_and_await_response(5)?;
266+
.send_and_await_response(timeout)?;
261267

262268
match res {
263269
Ok(Message::Response { body, .. }) => {

src/vfs/directory.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{Message, Request};
66
/// You can call it's impl functions to interact with it.
77
pub struct Directory {
88
pub path: String,
9+
pub timeout: u64,
910
}
1011

1112
impl Directory {
@@ -19,7 +20,7 @@ impl Directory {
1920
let message = Request::new()
2021
.target(("our", "vfs", "distro", "sys"))
2122
.body(serde_json::to_vec(&request)?)
22-
.send_and_await_response(5)?;
23+
.send_and_await_response(self.timeout)?;
2324

2425
match message {
2526
Ok(Message::Response { body, .. }) => {
@@ -37,10 +38,12 @@ impl Directory {
3738

3839
/// Opens or creates a directory at path.
3940
/// If trying to create an existing directory, will just give you the path.
40-
pub fn open_dir(path: &str, create: bool) -> anyhow::Result<Directory> {
41+
pub fn open_dir(path: &str, create: bool, timeout: Option<u64>) -> anyhow::Result<Directory> {
42+
let timeout = timeout.unwrap_or(5);
4143
if !create {
4244
return Ok(Directory {
4345
path: path.to_string(),
46+
timeout,
4447
});
4548
}
4649
let request = VfsRequest {
@@ -51,14 +54,15 @@ pub fn open_dir(path: &str, create: bool) -> anyhow::Result<Directory> {
5154
let message = Request::new()
5255
.target(("our", "vfs", "distro", "sys"))
5356
.body(serde_json::to_vec(&request)?)
54-
.send_and_await_response(5)?;
57+
.send_and_await_response(timeout)?;
5558

5659
match message {
5760
Ok(Message::Response { body, .. }) => {
5861
let response = serde_json::from_slice::<VfsResponse>(&body)?;
5962
match response {
6063
VfsResponse::Ok => Ok(Directory {
6164
path: path.to_string(),
65+
timeout,
6266
}),
6367
VfsResponse::Err(e) => Err(e.into()),
6468
_ => Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response)),
@@ -69,7 +73,9 @@ pub fn open_dir(path: &str, create: bool) -> anyhow::Result<Directory> {
6973
}
7074

7175
/// Removes a dir at path, errors if path not found or path is not a directory.
72-
pub fn remove_dir(path: &str) -> anyhow::Result<()> {
76+
pub fn remove_dir(path: &str, timeout: Option<u64>) -> anyhow::Result<()> {
77+
let timeout = timeout.unwrap_or(5);
78+
7379
let request = VfsRequest {
7480
path: path.to_string(),
7581
action: VfsAction::RemoveDir,
@@ -78,7 +84,7 @@ pub fn remove_dir(path: &str) -> anyhow::Result<()> {
7884
let message = Request::new()
7985
.target(("our", "vfs", "distro", "sys"))
8086
.body(serde_json::to_vec(&request)?)
81-
.send_and_await_response(5)?;
87+
.send_and_await_response(timeout)?;
8288

8389
match message {
8490
Ok(Message::Response { body, .. }) => {

0 commit comments

Comments
 (0)