Skip to content

Commit 62c88b9

Browse files
authored
agent: wrappers for sending control messages (#79)
1 parent 6324b1a commit 62c88b9

1 file changed

Lines changed: 113 additions & 209 deletions

File tree

agent/src/control/mod.rs

Lines changed: 113 additions & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2026 Oxide Computer Company
33
*/
44

55
use std::{io::Read, ops::Range, time::Duration};
@@ -33,6 +33,42 @@ impl Stuff {
3333
Ok(())
3434
}
3535

36+
async fn req(&mut self, payload: Payload) -> Result<Payload> {
37+
let message = Message { id: self.ids.next().unwrap(), payload };
38+
match self.send_and_recv(&message).await {
39+
Ok(response) => Ok(response.payload),
40+
Err(e) => {
41+
/*
42+
* Requests to the agent are relatively simple and over a UNIX
43+
* socket; they should not fail. This implies something has
44+
* gone seriously wrong and it is unlikely that it will be fixed
45+
* without intervention. Don't retry.
46+
*/
47+
bail!("could not talk to the agent: {e}");
48+
}
49+
}
50+
}
51+
52+
/*
53+
* Requests to the buildomat core are allowed to fail intermittently. We
54+
* need to retry until we are able to get a successful response of some
55+
* kind back from the server.
56+
*/
57+
async fn req_retry(&mut self, payload: Payload) -> Result<Payload> {
58+
loop {
59+
match self.req(payload.clone()).await? {
60+
Payload::Error(e) => {
61+
eprintln!(
62+
"WARNING: control request failure (retrying): {e}"
63+
);
64+
tokio::time::sleep(Duration::from_secs(2)).await;
65+
continue;
66+
}
67+
payload => return Ok(payload),
68+
}
69+
}
70+
}
71+
3672
async fn send_and_recv(&mut self, mout: &Message) -> Result<Message> {
3773
let us = self.us.as_mut().unwrap();
3874
let dec = self.dec.as_mut().unwrap();
@@ -111,30 +147,12 @@ async fn cmd_address_list(mut l: Level<Stuff>) -> Result<()> {
111147

112148
let filter = a.opts().opt_str("f");
113149

114-
let addrs = {
115-
let mout = Message {
116-
id: l.context_mut().ids.next().unwrap(),
117-
payload: Payload::MetadataAddresses,
118-
};
119-
120-
match l.context_mut().send_and_recv(&mout).await {
121-
Ok(min) => match min.payload {
122-
Payload::Error(e) => {
123-
bail!("WARNING: control request failure: {e}");
124-
}
125-
Payload::MetadataAddressesResult(addrs) => addrs,
126-
other => bail!("unexpected response: {other:?}"),
127-
},
128-
Err(e) => {
129-
/*
130-
* Requests to the agent are relatively simple and over a UNIX
131-
* socket; they should not fail. This implies something has
132-
* gone seriously wrong and it is unlikely that it will be fixed
133-
* without intervention. Don't retry.
134-
*/
135-
bail!("could not talk to the agent: {e}");
136-
}
150+
let addrs = match l.context_mut().req(Payload::MetadataAddresses).await? {
151+
Payload::Error(e) => {
152+
bail!("WARNING: control request failure: {e}");
137153
}
154+
Payload::MetadataAddressesResult(addrs) => addrs,
155+
other => bail!("unexpected response: {other:?}"),
138156
};
139157

140158
let mut t = a.table();
@@ -231,31 +249,15 @@ async fn cmd_eng(mut l: Level<Stuff>) -> Result<()> {
231249
async fn cmd_eng_metadata(mut l: Level<Stuff>) -> Result<()> {
232250
let _ = no_args!(l);
233251

234-
let mout = Message {
235-
id: l.context_mut().ids.next().unwrap(),
236-
payload: Payload::MetadataAddresses,
237-
};
238-
239-
match l.context_mut().send_and_recv(&mout).await {
240-
Ok(min) => match min.payload {
241-
Payload::Error(e) => {
242-
bail!("WARNING: control request failure: {e}");
243-
}
244-
Payload::MetadataAddressesResult(addrs) => {
245-
println!("addrs = {addrs:#?}");
246-
Ok(())
247-
}
248-
other => bail!("unexpected response: {other:?}"),
249-
},
250-
Err(e) => {
251-
/*
252-
* Requests to the agent are relatively simple and over a UNIX
253-
* socket; they should not fail. This implies something has
254-
* gone seriously wrong and it is unlikely that it will be fixed
255-
* without intervention. Don't retry.
256-
*/
257-
bail!("could not talk to the agent: {e}");
252+
match l.context_mut().req(Payload::MetadataAddresses).await? {
253+
Payload::Error(e) => {
254+
bail!("WARNING: control request failure: {e}");
255+
}
256+
Payload::MetadataAddressesResult(addrs) => {
257+
println!("addrs = {addrs:#?}");
258+
Ok(())
258259
}
260+
other => bail!("unexpected response: {other:?}"),
259261
}
260262
}
261263

@@ -282,69 +284,39 @@ async fn cmd_store_get(mut l: Level<Stuff>) -> Result<()> {
282284
let no_wait = a.opts().opt_present("W");
283285
let mut printed_wait = false;
284286

287+
let req = Payload::StoreGet(name.clone());
285288
loop {
286-
let mout = Message {
287-
id: l.context_mut().ids.next().unwrap(),
288-
payload: Payload::StoreGet(name.clone()),
289-
};
289+
match l.context_mut().req_retry(req.clone()).await? {
290+
Payload::StoreGetResult(Some(ent)) => {
291+
/*
292+
* Output formatting here should be kept consistent with
293+
* what "buildomat job store get" does outside a job;
294+
* see the "buildomat" crate.
295+
*/
296+
if ent.value.ends_with('\n') {
297+
print!("{}", ent.value);
298+
} else {
299+
println!("{}", ent.value);
300+
}
301+
break Ok(());
302+
}
303+
Payload::StoreGetResult(None) => {
304+
if no_wait {
305+
bail!("the store has no value for {name:?}");
306+
}
290307

291-
match l.context_mut().send_and_recv(&mout).await {
292-
Ok(min) => {
293-
match min.payload {
294-
Payload::Error(e) => {
295-
/*
296-
* Requests to the buildomat core are allowed to fail
297-
* intermittently. We need to retry until we are able
298-
* to get a successful response of some kind back from
299-
* the server.
300-
*/
301-
eprintln!(
302-
"WARNING: control request failure (retrying): {e}"
303-
);
304-
tokio::time::sleep(Duration::from_secs(2)).await;
305-
continue;
306-
}
307-
Payload::StoreGetResult(Some(ent)) => {
308-
/*
309-
* Output formatting here should be kept consistent with
310-
* what "buildomat job store get" does outside a job;
311-
* see the "buildomat" crate.
312-
*/
313-
if ent.value.ends_with('\n') {
314-
print!("{}", ent.value);
315-
} else {
316-
println!("{}", ent.value);
317-
}
318-
break Ok(());
319-
}
320-
Payload::StoreGetResult(None) => {
321-
if no_wait {
322-
bail!("the store has no value for {name:?}");
323-
}
324-
325-
if !printed_wait {
326-
eprintln!(
327-
"WARNING: job store has no value \
308+
if !printed_wait {
309+
eprintln!(
310+
"WARNING: job store has no value \
328311
for {name:?}; waiting for a value..."
329-
);
330-
printed_wait = true;
331-
}
332-
333-
tokio::time::sleep(Duration::from_secs(2)).await;
334-
continue;
335-
}
336-
other => bail!("unexpected response: {other:?}"),
312+
);
313+
printed_wait = true;
337314
}
315+
316+
tokio::time::sleep(Duration::from_secs(2)).await;
317+
continue;
338318
}
339-
Err(e) => {
340-
/*
341-
* Requests to the agent are relatively simple and over a UNIX
342-
* socket; they should not fail. This implies something has
343-
* gone seriously wrong and it is unlikely that it will be fixed
344-
* without intervention. Don't retry.
345-
*/
346-
bail!("could not talk to the agent: {e}");
347-
}
319+
other => bail!("unexpected response: {other:?}"),
348320
}
349321
}
350322
}
@@ -384,47 +356,11 @@ async fn cmd_store_put(mut l: Level<Stuff>) -> Result<()> {
384356
};
385357

386358
let secret = a.opts().opt_present("s");
359+
let req = Payload::StorePut(a.args()[0].to_string(), value.clone(), secret);
387360

388-
loop {
389-
let mout = Message {
390-
id: l.context_mut().ids.next().unwrap(),
391-
payload: Payload::StorePut(
392-
a.args()[0].to_string(),
393-
value.clone(),
394-
secret,
395-
),
396-
};
397-
398-
match l.context_mut().send_and_recv(&mout).await {
399-
Ok(min) => {
400-
match min.payload {
401-
Payload::Error(e) => {
402-
/*
403-
* Requests to the buildomat core are allowed to fail
404-
* intermittently. We need to retry until we are able
405-
* to get a successful response of some kind back from
406-
* the server.
407-
*/
408-
eprintln!(
409-
"WARNING: control request failure (retrying): {e}"
410-
);
411-
tokio::time::sleep(Duration::from_secs(2)).await;
412-
continue;
413-
}
414-
Payload::Ack => break Ok(()),
415-
other => bail!("unexpected response: {other:?}"),
416-
}
417-
}
418-
Err(e) => {
419-
/*
420-
* Requests to the agent are relatively simple and over a UNIX
421-
* socket; they should not fail. This implies something has
422-
* gone seriously wrong and it is unlikely that it will be fixed
423-
* without intervention. Don't retry.
424-
*/
425-
bail!("could not talk to the agent: {e}");
426-
}
427-
}
361+
match l.context_mut().req_retry(req).await? {
362+
Payload::Ack => Ok(()),
363+
other => bail!("unexpected response: {other:?}"),
428364
}
429365
}
430366

@@ -445,81 +381,49 @@ async fn cmd_process_start(mut l: Level<Stuff>) -> Result<()> {
445381
bad_args!(l, "specify at least a process name and a command to run");
446382
}
447383

448-
let mout = Message {
449-
id: l.context_mut().ids.next().unwrap(),
450-
payload: Payload::ProcessStart {
451-
name: a.args()[0].to_string(),
452-
cmd: a.args()[1].to_string(),
453-
args: a.args().iter().skip(2).cloned().collect::<Vec<_>>(),
454-
455-
/*
456-
* The process will actually be spawned by the agent, which is
457-
* running under service management. To aid the user, we want
458-
* to forward the environment and current directory so that the
459-
* process can be started as if it were run from the job program
460-
* itself.
461-
*/
462-
env: std::env::vars_os().collect::<Vec<_>>(),
463-
pwd: std::env::current_dir()?.to_str().unwrap().to_string(),
464-
465-
uid: unsafe { libc::geteuid() },
466-
gid: unsafe { libc::getegid() },
467-
},
384+
let payload = Payload::ProcessStart {
385+
name: a.args()[0].to_string(),
386+
cmd: a.args()[1].to_string(),
387+
args: a.args().iter().skip(2).cloned().collect::<Vec<_>>(),
388+
389+
/*
390+
* The process will actually be spawned by the agent, which is
391+
* running under service management. To aid the user, we want
392+
* to forward the environment and current directory so that the
393+
* process can be started as if it were run from the job program
394+
* itself.
395+
*/
396+
env: std::env::vars_os().collect::<Vec<_>>(),
397+
pwd: std::env::current_dir()?.to_str().unwrap().to_string(),
398+
399+
uid: unsafe { libc::geteuid() },
400+
gid: unsafe { libc::getegid() },
468401
};
469402

470-
match l.context_mut().send_and_recv(&mout).await {
471-
Ok(min) => {
472-
match min.payload {
473-
Payload::Error(e) => {
474-
/*
475-
* This request is purely local to the agent, so an
476-
* error is not something we should retry indefinitely.
477-
*/
478-
bail!("could not start process: {e}");
479-
}
480-
Payload::Ack => Ok(()),
481-
other => bail!("unexpected response: {other:?}"),
482-
}
483-
}
484-
Err(e) => {
403+
match l.context_mut().req(payload).await? {
404+
Payload::Error(e) => {
485405
/*
486-
* Requests to the agent are relatively simple and over a UNIX
487-
* socket; they should not fail. This implies something has
488-
* gone seriously wrong and it is unlikely that it will be fixed
489-
* without intervention. Don't retry.
406+
* This request is purely local to the agent, so an
407+
* error is not something we should retry indefinitely.
490408
*/
491-
bail!("could not talk to the agent: {e}");
409+
bail!("could not start process: {e}");
492410
}
411+
Payload::Ack => Ok(()),
412+
other => bail!("unexpected response: {other:?}"),
493413
}
494414
}
495415

496416
async fn factory_info(s: &mut Stuff) -> Result<FactoryInfo> {
497-
let mout =
498-
Message { id: s.ids.next().unwrap(), payload: Payload::FactoryInfo };
499-
500-
match s.send_and_recv(&mout).await {
501-
Ok(min) => {
502-
match min.payload {
503-
Payload::Error(e) => {
504-
/*
505-
* This request is purely local to the agent, so an
506-
* error is not something we should retry indefinitely.
507-
*/
508-
bail!("could not get factory info: {e}");
509-
}
510-
Payload::FactoryInfoResult(fi) => Ok(fi),
511-
other => bail!("unexpected response: {other:?}"),
512-
}
513-
}
514-
Err(e) => {
417+
match s.req(Payload::FactoryInfo).await? {
418+
Payload::Error(e) => {
515419
/*
516-
* Requests to the agent are relatively simple and over a UNIX
517-
* socket; they should not fail. This implies something has
518-
* gone seriously wrong and it is unlikely that it will be fixed
519-
* without intervention. Don't retry.
420+
* This request is purely local to the agent, so an
421+
* error is not something we should retry indefinitely.
520422
*/
521-
bail!("could not talk to the agent: {e}");
423+
bail!("could not get factory info: {e}");
522424
}
425+
Payload::FactoryInfoResult(fi) => Ok(fi),
426+
other => bail!("unexpected response: {other:?}"),
523427
}
524428
}
525429

0 commit comments

Comments
 (0)