Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 42 additions & 27 deletions controller/src/application/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,40 +77,53 @@ impl Group {
}
true
}),
Stage::Queueing => true,
Stage::Queueing | Stage::Stopping => true,
});

if self.scaling.stop_empty_servers && self.servers.len() as u32 > target_count {
let mut to_stop = self.servers.len() as u32 - target_count;
// Count all servers that are active
let current_count = self
.servers
.iter()
.filter(|server| matches!(server.1 .1, Stage::Active))
.count();

if self.scaling.stop_empty_servers && current_count as u32 > target_count {
let mut to_stop = current_count as u32 - target_count;
let mut requests = vec![];
self.servers.retain(|id, server| match &server.1 {
Stage::Active => servers.get_server_mut(id.uuid()).is_some_and(|server| {
if server.connected_users() == &0 {
if server.flags().should_stop() && to_stop > 0 {
debug!(
self.servers
.retain(|id, group_server| match &group_server.1 {
Stage::Active => servers.get_server_mut(id.uuid()).is_some_and(|server| {
if server.connected_users() == &0 {
if server.flags().should_stop() && to_stop > 0 {
debug!(
"Server {} is empty and reached the timeout, stopping it...",
server.id()
);
requests.push(StopRequest::new(None, server.id().clone()));
to_stop -= 1;
requests.push(StopRequest::new(None, server.id().clone()));
to_stop -= 1;
server.flags_mut().clear_stop();
// Mark server as stopping
group_server.1 = Stage::Stopping;
} else if !server.flags().is_stop_set() {
debug!(
"Server {} is empty, starting stop timer...",
server.id()
);
server
.flags_mut()
.replace_stop(*config.empty_server_timeout());
}
} else if server.flags().is_stop_set() {
debug!(
"Server {} is no longer empty, clearing stop timer...",
server.id()
);
server.flags_mut().clear_stop();
} else if !server.flags().is_stop_set() {
debug!("Server {} is empty, starting stop timer...", server.id());
server
.flags_mut()
.replace_stop(*config.empty_server_timeout());
}
} else if server.flags().is_stop_set() {
debug!(
"Server {} is no longer empty, clearing stop timer...",
server.id()
);
server.flags_mut().clear_stop();
}
true
}),
Stage::Queueing => true,
});
true
}),
Stage::Queueing | Stage::Stopping => true,
});
servers.schedule_stops(requests);
}
}
Expand Down Expand Up @@ -181,6 +194,7 @@ impl Group {
servers.cancel_start(id.uuid());
false
}
Stage::Stopping => false,
});

self.status = LifecycleStatus::Inactive;
Expand All @@ -196,7 +210,7 @@ impl Group {
.iter()
.find_map(|(id, server)| match &server.1 {
Stage::Active => servers.get_server(id.uuid()),
Stage::Queueing => None,
Stage::Queueing | Stage::Stopping => None,
})
}

Expand Down Expand Up @@ -244,6 +258,7 @@ struct GroupServer(usize, Stage);
enum Stage {
Queueing,
Active,
Stopping,
}

impl GroupServer {
Expand Down
13 changes: 6 additions & 7 deletions controller/src/application/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use simplelog::error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
Expand Down Expand Up @@ -33,8 +32,8 @@ impl<T> Subscriber<T> {
pub async fn send_network(&self, data: Result<T, Status>) -> bool {
match &self.0 {
Dispatch::Network(sender) => {
if let Err(error) = sender.send(data).await {
error!("Failed to send network message: {}", error);
if sender.send(data).await.is_err() {
// Channel closed
false
} else {
true
Expand All @@ -47,16 +46,16 @@ impl<T> Subscriber<T> {
pub async fn send_message(&self, message: T) -> bool {
match &self.0 {
Dispatch::Network(sender) => {
if let Err(error) = sender.send(Ok(message)).await {
error!("Failed to send network message: {}", error);
if sender.send(Ok(message)).await.is_err() {
// Channel closed
false
} else {
true
}
}
Dispatch::Plugin(sender) => {
if let Err(error) = sender.send(Ok(message)).await {
error!("Failed to send plugin message: {}", error);
if sender.send(Ok(message)).await.is_err() {
// Channel closed
false
} else {
true
Expand Down
5 changes: 5 additions & 0 deletions plugins/cloudflare/src/plugin/dns/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl Records {
}
}

if batch.deletes.is_empty() {
// No request to cloudflare required
continue;
}

count += batch.deletes.len();
backend.send_batch(zone_id, &batch);
}
Expand Down