Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 32 additions & 47 deletions devolutions-session/src/dvc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ impl WinApiProcessCtx {
Ok(())
}

pub fn process_cancel(
&mut self,
io_notification_tx: &Sender<ServerChannelEvent>,
) -> Result<(), ExecError> {
pub fn process_cancel(&mut self, io_notification_tx: &Sender<ServerChannelEvent>) -> Result<(), ExecError> {
info!(
session_id = self.session_id,
"Cancelling process execution by user request"
Expand All @@ -136,10 +133,9 @@ impl WinApiProcessCtx {

// Acknowledge client that cancel request has been processed
// successfully.
io_notification_tx
.blocking_send(ServerChannelEvent::SessionCancelSuccess {
session_id: self.session_id,
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionCancelSuccess {
session_id: self.session_id,
})?;

Ok(())
}
Expand All @@ -158,8 +154,7 @@ impl WinApiProcessCtx {
const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_OBJECT_0;
const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1);

io_notification_tx
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;

loop {
// SAFETY: No preconditions.
Expand Down Expand Up @@ -283,8 +278,7 @@ impl WinApiProcessCtx {

// Signal client side about started execution

io_notification_tx
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;

info!(session_id, "Process IO is ready for async loop execution");
loop {
Expand Down Expand Up @@ -375,26 +369,24 @@ impl WinApiProcessCtx {
// EOF on stdout pipe, close it and send EOF message to message_tx
self.stdout_read_pipe = None;

io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: true,
data: Vec::new(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: true,
data: Vec::new(),
})?;
}
_code => return Err(err.into()),
}
continue;
}

io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: false,
data: stdout_buffer[..bytes_read as usize].to_vec(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stdout,
last: false,
data: stdout_buffer[..bytes_read as usize].to_vec(),
})?;

// Schedule next overlapped read
// SAFETY: pipe is valid to read from, as long as it is not closed.
Expand Down Expand Up @@ -438,26 +430,24 @@ impl WinApiProcessCtx {
ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => {
// EOF on stderr pipe, close it and send EOF message to message_tx
self.stderr_read_pipe = None;
io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: true,
data: Vec::new(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: true,
data: Vec::new(),
})?;
}
_code => return Err(err.into()),
}
continue;
}

io_notification_tx
.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: false,
data: stderr_buffer[..bytes_read as usize].to_vec(),
})?;
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
session_id,
stream: NowExecDataStreamKind::Stderr,
last: false,
data: stderr_buffer[..bytes_read as usize].to_vec(),
})?;

// Schedule next overlapped read
// SAFETY: pipe is valid to read from, as long as it is not closed.
Expand Down Expand Up @@ -592,7 +582,8 @@ impl WinApiProcessBuilder {
// Create channel for `task` -> `Process IO thread` communication
let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?;

let io_notification_tx = io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode");
let io_notification_tx =
io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode");

let join_handle = std::thread::spawn(move || {
let run_result = if io_redirection {
Expand Down Expand Up @@ -653,14 +644,8 @@ fn prepare_process(

let environment_block = (!env.is_empty()).then(|| make_environment_block(env)).transpose()?;

// Control console window visibility:
// - CREATE_NEW_CONSOLE creates a new console window
// - SW_HIDE hides the console window
let mut creation_flags = NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE;

startup_info.dwFlags |= STARTF_USESHOWWINDOW;
startup_info.wShowWindow = u16::try_from(SW_HIDE.0).expect("SHOW_WINDOW_CMD fits into u16");

if environment_block.is_some() {
creation_flags |= CREATE_UNICODE_ENVIRONMENT;
}
Expand Down
50 changes: 14 additions & 36 deletions devolutions-session/src/dvc/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ impl MessageProcessor {
Ok(())
}

async fn send_detached_process_success(&self, session_id: u32) -> Result<(), ExecError> {
self.io_notification_tx
.send(ServerChannelEvent::SessionExited {
session_id,
exit_code: 0,
})
.await?;
Ok(())
}

pub(crate) async fn process_message(
&mut self,
message: NowMessage<'static>,
Expand Down Expand Up @@ -570,15 +580,7 @@ impl MessageProcessor {
if exec_msg.is_detached() {
// Detached mode: fire-and-forget, no IO redirection
run_process.run_detached(exec_msg.session_id())?;

// Send success immediately for detached processes
self.io_notification_tx
.send(ServerChannelEvent::SessionExited {
session_id: exec_msg.session_id(),
exit_code: 0,
})
.await?;

self.send_detached_process_success(exec_msg.session_id()).await?;
return Ok(());
}

Expand Down Expand Up @@ -612,15 +614,7 @@ impl MessageProcessor {
if batch_msg.is_detached() {
// Detached mode: fire-and-forget, no IO redirection
run_batch.run_detached(batch_msg.session_id())?;

// Send success immediately for detached processes
self.io_notification_tx
.send(ServerChannelEvent::SessionExited {
session_id: batch_msg.session_id(),
exit_code: 0,
})
.await?;

self.send_detached_process_success(batch_msg.session_id()).await?;
return Ok(());
}

Expand Down Expand Up @@ -671,15 +665,7 @@ impl MessageProcessor {
if winps_msg.is_detached() {
// Detached mode: fire-and-forget, no IO redirection
run_process.run_detached(winps_msg.session_id())?;

// Send success immediately for detached processes
self.io_notification_tx
.send(ServerChannelEvent::SessionExited {
session_id: winps_msg.session_id(),
exit_code: 0,
})
.await?;

self.send_detached_process_success(winps_msg.session_id()).await?;
return Ok(());
}

Expand Down Expand Up @@ -732,15 +718,7 @@ impl MessageProcessor {
if winps_msg.is_detached() {
// Detached mode: fire-and-forget, no IO redirection
run_process.run_detached(winps_msg.session_id())?;

// Send success immediately for detached processes
self.io_notification_tx
.send(ServerChannelEvent::SessionExited {
session_id: winps_msg.session_id(),
exit_code: 0,
})
.await?;

self.send_detached_process_success(winps_msg.session_id()).await?;
return Ok(());
}

Expand Down
Loading