Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion devolutions-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ win-api-wrappers = { path = "../crates/win-api-wrappers", optional = true }

[dependencies.now-proto-pdu]
optional = true
version = "0.3.2"
version = "0.4.1"
features = ["std"]

[target.'cfg(windows)'.build-dependencies]
Expand Down
160 changes: 92 additions & 68 deletions devolutions-session/src/dvc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ pub enum ServerChannelEvent {
pub struct WinApiProcessCtx {
session_id: u32,

io_notification_tx: Sender<ServerChannelEvent>,

stdout_read_pipe: Option<Pipe>,
stderr_read_pipe: Option<Pipe>,
stdin_write_pipe: Option<Pipe>,
Expand All @@ -123,7 +121,7 @@ impl WinApiProcessCtx {
Ok(())
}

pub fn process_cancel(&mut self) -> Result<(), ExecError> {
pub fn process_cancel(&mut self, io_notification_tx: &Sender<ServerChannelEvent>) -> Result<(), ExecError> {
info!(
session_id = self.session_id,
"Cancelling process execution by user request"
Expand All @@ -135,15 +133,18 @@ impl WinApiProcessCtx {

// Acknowledge client that cancel request has been processed
// successfully.
self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionCancelSuccess {
session_id: self.session_id,
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionCancelSuccess {
session_id: self.session_id,
})?;

Ok(())
}

pub fn wait(mut self, mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>) -> Result<u32, ExecError> {
pub fn wait(
mut self,
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Result<u32, ExecError> {
let session_id = self.session_id;

info!(session_id, "Waiting for process to exit");
Expand All @@ -153,8 +154,7 @@ impl WinApiProcessCtx {
const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_OBJECT_0;
const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1);

self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;

loop {
// SAFETY: No preconditions.
Expand All @@ -179,7 +179,7 @@ impl WinApiProcessCtx {
return Err(ExecError::Aborted);
}
ProcessIoInputEvent::CancelExecution => {
self.process_cancel()?;
self.process_cancel(&io_notification_tx)?;

// wait for process to exit
continue;
Expand Down Expand Up @@ -209,6 +209,7 @@ impl WinApiProcessCtx {
pub fn wait_with_io_redirection(
mut self,
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Result<u32, ExecError> {
let session_id = self.session_id;

Expand Down Expand Up @@ -277,8 +278,7 @@ impl WinApiProcessCtx {

// Signal client side about started execution

self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;

info!(session_id, "Process IO is ready for async loop execution");
loop {
Expand All @@ -304,7 +304,7 @@ impl WinApiProcessCtx {
return Err(ExecError::Aborted);
}
ProcessIoInputEvent::CancelExecution => {
self.process_cancel()?;
self.process_cancel(&io_notification_tx)?;

// wait for process to exit
continue;
Expand Down Expand Up @@ -369,26 +369,24 @@ impl WinApiProcessCtx {
// EOF on stdout pipe, close it and send EOF message to message_tx
self.stdout_read_pipe = None;

self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: true,
data: Vec::new(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: true,
data: Vec::new(),
})?;
}
_code => return Err(err.into()),
}
continue;
}

self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: false,
data: stdout_buffer[..bytes_read as usize].to_vec(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: false,
data: stdout_buffer[..bytes_read as usize].to_vec(),
})?;

// Schedule next overlapped read
// SAFETY: pipe is valid to read from, as long as it is not closed.
Expand Down Expand Up @@ -432,26 +430,24 @@ impl WinApiProcessCtx {
ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => {
// EOF on stderr pipe, close it and send EOF message to message_tx
self.stderr_read_pipe = None;
self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: true,
data: Vec::new(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: true,
data: Vec::new(),
})?;
}
_code => return Err(err.into()),
}
continue;
}

self.io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: false,
data: stderr_buffer[..bytes_read as usize].to_vec(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: false,
data: stderr_buffer[..bytes_read as usize].to_vec(),
})?;

// Schedule next overlapped read
// SAFETY: pipe is valid to read from, as long as it is not closed.
Expand Down Expand Up @@ -527,12 +523,12 @@ impl WinApiProcessBuilder {
self
}

/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
pub fn run(
fn run_impl(
mut self,
session_id: u32,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Result<WinApiProcess, ExecError> {
io_notification_tx: Option<Sender<ServerChannelEvent>>,
detached: bool,
) -> Result<Option<WinApiProcess>, ExecError> {
let command_line = format!("\"{}\" {}", self.executable, self.command_line)
.trim_end()
.to_owned();
Expand All @@ -557,31 +553,42 @@ impl WinApiProcessBuilder {
let io_redirection = self.enable_io_redirection;

let process_ctx = if io_redirection {
prepare_process_with_io_redirection(
session_id,
command_line,
current_directory,
self.env,
io_notification_tx.clone(),
)?
prepare_process_with_io_redirection(session_id, command_line, current_directory, self.env)?
} else {
prepare_process(
session_id,
command_line,
current_directory,
self.env,
io_notification_tx.clone(),
)?
prepare_process(session_id, command_line, current_directory, self.env)?
};

// For detached mode, spawn a thread that waits for process exit and keeps temp files alive
if detached && !temp_files.is_empty() {
std::thread::spawn(move || {
let _temp_files = temp_files;

// Wait for process to exit (indefinitely)
if let Err(error) = process_ctx.process.wait(None) {
error!(%error, session_id, "Failed to wait for detached process");
return;
}

info!(session_id, "Detached process exited");

// Temp files will be cleaned up when this thread exits
});

info!(session_id, "Detached process started successfully");
return Ok(None);
}

// Create channel for `task` -> `Process IO thread` communication
let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?;

let io_notification_tx =
io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode");

let join_handle = std::thread::spawn(move || {
let run_result = if io_redirection {
process_ctx.wait_with_io_redirection(input_event_rx)
process_ctx.wait_with_io_redirection(input_event_rx, io_notification_tx.clone())
} else {
process_ctx.wait(input_event_rx)
process_ctx.wait(input_event_rx, io_notification_tx.clone())
};

let notification = match run_result {
Expand All @@ -594,11 +601,31 @@ impl WinApiProcessBuilder {
}
});

Ok(WinApiProcess {
Ok(Some(WinApiProcess {
input_event_tx,
join_handle,
_temp_files: temp_files,
})
}))
}

/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
pub fn run(
self,
session_id: u32,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Result<WinApiProcess, ExecError> {
Ok(self
.run_impl(session_id, Some(io_notification_tx), false)?
.expect("result should be non-optional when running in non-detached mode"))
}

/// Starts process in detached mode (fire-and-forget).
/// No IO redirection. Process exit is monitored in a background thread to manage temp file cleanup.
/// Returns immediately after spawning.
pub fn run_detached(self, session_id: u32) -> Result<(), ExecError> {
// Result always empty and therefore ignored in detached mode.
self.run_impl(session_id, None, true)?;
Ok(())
}
}

Expand All @@ -607,7 +634,6 @@ fn prepare_process(
mut command_line: WideString,
current_directory: WideString,
env: HashMap<String, String>,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Result<WinApiProcessCtx, ExecError> {
let mut process_information = PROCESS_INFORMATION::default();

Expand All @@ -620,6 +646,7 @@ fn prepare_process(
let environment_block = (!env.is_empty()).then(|| make_environment_block(env)).transpose()?;

let mut creation_flags = NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE;

if environment_block.is_some() {
creation_flags |= CREATE_UNICODE_ENVIRONMENT;
}
Expand Down Expand Up @@ -657,7 +684,6 @@ fn prepare_process(

Ok(WinApiProcessCtx {
session_id,
io_notification_tx,
stdout_read_pipe: None,
stderr_read_pipe: None,
stdin_write_pipe: None,
Expand All @@ -671,7 +697,6 @@ fn prepare_process_with_io_redirection(
mut command_line: WideString,
current_directory: WideString,
env: HashMap<String, String>,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Result<WinApiProcessCtx, ExecError> {
let mut process_information = PROCESS_INFORMATION::default();

Expand Down Expand Up @@ -741,7 +766,6 @@ fn prepare_process_with_io_redirection(

let process_ctx = WinApiProcessCtx {
session_id,
io_notification_tx,
stdout_read_pipe: Some(stdout_read_pipe),
stderr_read_pipe: Some(stderr_read_pipe),
stdin_write_pipe: Some(stdin_write_pipe),
Expand Down
Loading