Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ jobs:
cp $(find ./clients/java/paper/build -name "*-all.jar") ac-core.jar
cp $(find ./clients/java/paper/send/build -name "*-all.jar") ac-send.jar
cp $(find ./clients/java/paper/notify/build -name "*-all.jar") ac-notify.jar
cp $(find ./clients/java/paper/fake-proxy/build -name "*-all.jar") ac-fake-proxy.jar

- name: Create egg tar.gz and template tar.gz
run: |
Expand Down Expand Up @@ -158,3 +159,4 @@ jobs:
ac-core.jar
ac-send.jar
ac-notify.jar
ac-fake-proxy.jar
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class BytesImpl implements StreamObserver<Channel.Msg>, Bytes {

@Override
public void close() {
this.handle.cancel();
this.handle.cancel("Closed by user");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public class CallHandle<ReqT, RespT> implements ClientResponseObserver<ReqT, Res
@Getter
private ClientCallStreamObserver<ReqT> stream;

public void cancel() {
stream.cancel(null, null);
public void cancel(String message) {
stream.cancel(message, new InterruptedException(message));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public CallHandle<?, Notify.PowerEvent> subscribeToPowerEvents(StreamObserver<No
return handle;
}

public CallHandle<?, Notify.ReadyEvent> subscribeToReadyEvents(StreamObserver<Notify.ReadyEvent> observer) {
var handle = new CallHandle<>(observer);
this.client.subscribeToReadyEvents(Empty.getDefaultInstance(), handle);
return handle;
}

@Contract(" -> new")
public static @NotNull ClientConnection createFromEnv() {
var address = System.getenv("CONTROLLER_ADDRESS");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.atomic.cloud.paper.notify;

import io.atomic.cloud.paper.CloudPlugin;
import io.atomic.cloud.paper.notify.notify.PowerHandler;
import io.atomic.cloud.paper.notify.handler.PowerHandler;
import io.atomic.cloud.paper.notify.handler.ReadyHandler;
import io.atomic.cloud.paper.notify.setting.message.Messages;
import lombok.Getter;
import org.bukkit.plugin.java.JavaPlugin;
Expand All @@ -17,6 +18,7 @@ public class NotifyPlugin extends JavaPlugin {
private Messages messages;

private PowerHandler powerHandler;
private ReadyHandler readyHandler;

@Override
public void onLoad() {
Expand All @@ -25,17 +27,20 @@ public void onLoad() {
this.messages = new Messages(this.getConfig());

this.powerHandler = new PowerHandler(CloudPlugin.INSTANCE.clientConnection());
this.readyHandler = new ReadyHandler(CloudPlugin.INSTANCE.clientConnection());
}

@Override
public void onEnable() {
// Enable the notification system
this.powerHandler.enable();
this.readyHandler.enable();
}

@Override
public void onDisable() {
// Cleanup
this.powerHandler.cleanup();
this.readyHandler.cleanup();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.atomic.cloud.paper.notify.notify;
package io.atomic.cloud.paper.notify.handler;

import io.atomic.cloud.common.connection.call.CallHandle;
import io.atomic.cloud.common.connection.client.ClientConnection;
Expand All @@ -23,35 +23,35 @@ public void enable() {
}

public void cleanup() {
this.handle.cancel();
this.handle.cancel("Closed by cleanup");
}

@Override
public void onNext(Notify.PowerEvent powerEvent) {
public void onNext(Notify.PowerEvent event) {
try {
Bukkit.getOnlinePlayers().stream()
.filter(Permissions.POWER_NOTIFY::check)
.forEach(player -> {
if (powerEvent.getState() == Notify.PowerEvent.State.START) {
if (event.getState() == Notify.PowerEvent.State.START) {
NotifyPlugin.INSTANCE
.messages()
.serverStarting()
.send(
player,
Placeholder.unparsed("name", powerEvent.getName()),
Placeholder.unparsed("node", powerEvent.getNode()));
Placeholder.unparsed("name", event.getName()),
Placeholder.unparsed("node", event.getNode()));
} else {
NotifyPlugin.INSTANCE
.messages()
.serverStopping()
.send(
player,
Placeholder.unparsed("name", powerEvent.getName()),
Placeholder.unparsed("node", powerEvent.getNode()));
Placeholder.unparsed("name", event.getName()),
Placeholder.unparsed("node", event.getNode()));
}
});
} catch (Throwable throwable) {
NotifyPlugin.LOGGER.info("Failed to process power event for server {}:", powerEvent.getName());
NotifyPlugin.LOGGER.info("Failed to process power event for server {}:", event.getName());
NotifyPlugin.LOGGER.error("-> ", throwable);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.atomic.cloud.paper.notify.handler;

import io.atomic.cloud.common.connection.call.CallHandle;
import io.atomic.cloud.common.connection.client.ClientConnection;
import io.atomic.cloud.grpc.common.Notify;
import io.atomic.cloud.paper.CloudPlugin;
import io.atomic.cloud.paper.notify.NotifyPlugin;
import io.atomic.cloud.paper.notify.permission.Permissions;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import net.kyori.adventure.text.minimessage.tag.resolver.Placeholder;
import org.bukkit.Bukkit;

@RequiredArgsConstructor
public class ReadyHandler implements StreamObserver<Notify.ReadyEvent> {

private final ClientConnection connection;
private CallHandle<?, ?> handle;

public void enable() {
NotifyPlugin.LOGGER.info("Enabling ready notifications...");
this.handle = this.connection.subscribeToReadyEvents(this);
}

public void cleanup() {
this.handle.cancel("Closed by cleanup");
}

@Override
public void onNext(Notify.ReadyEvent event) {
try {
Bukkit.getOnlinePlayers().stream()
.filter(Permissions.READY_NOTIFY::check)
.forEach(player -> {
if (event.getReady()) {
NotifyPlugin.INSTANCE
.messages()
.serverReady()
.send(player, Placeholder.unparsed("name", event.getName()));
} else {
NotifyPlugin.INSTANCE
.messages()
.serverNotReady()
.send(player, Placeholder.unparsed("name", event.getName()));
}
});
} catch (Throwable throwable) {
NotifyPlugin.LOGGER.info("Failed to process ready event for server {}:", event.getName());
NotifyPlugin.LOGGER.error("-> ", throwable);
}
}

@Override
public void onError(Throwable throwable) {
CloudPlugin.LOGGER.error("Failed to handle ready event", throwable);
}

@Override
public void onCompleted() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
@Getter
@SuppressWarnings("UnstableApiUsage")
public enum Permissions {
POWER_NOTIFY("atomic.cloud.power.notify");
POWER_NOTIFY("atomic.cloud.power.notify"),
READY_NOTIFY("atomic.cloud.ready.notify");

private final String permission;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ public class Messages {
private final Message serverStarting;
private final Message serverStopping;

private final Message serverReady;
private final Message serverNotReady;

public Messages(FileConfiguration configuration) {
this.serverStarting = new SingleLine("messages.server-starting", configuration);
this.serverStopping = new SingleLine("messages.server-stopping", configuration);

this.serverReady = new SingleLine("messages.server-ready", configuration);
this.serverNotReady = new SingleLine("messages.server-not-ready", configuration);
}
}
4 changes: 3 additions & 1 deletion clients/java/paper/notify/src/main/resources/config.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
messages:
server-starting: "<green>⬆ <color:#4AE77A>Spinning up server <color:#2DA953><name> <color:#4AE77A>on node <color:#2DA953><node>!"
server-stopping: "<red>⬇ <color:#C85252>Shutting down server <color:#FA2E2E><name>!"
server-stopping: "<red>⬇ <color:#C85252>Shutting down server <color:#FA2E2E><name>!"
server-ready: "<green>✓ <color:#4AE77A>Server <color:#2DA953><name> <color:#4AE77A>is now ready!"
server-not-ready: "<red>⚠ <color:#C85252>Server <color:#FA2E2E><name> <color:#C85252>is no longer ready!"
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void onDisable() {
this.transferHandler.cleanup();

try {
this.self.ready(false).join();
if (this.settings.suicideOnDisable()) {
this.self.shutdown().thenRun(heart::stop).join();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void enable() {
}

public void cleanup() {
this.handle.cancel();
this.handle.cancel("Closed by cleanup");
}

@Override
Expand Down
32 changes: 31 additions & 1 deletion controller/src/application/plugin/runtime/wasm/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use tokio::sync::{mpsc::Receiver, MutexGuard};
use wasmtime::{component::ResourceAny, AsContextMut, Store};

use crate::application::{
subscriber::{manager::event::ServerEvent, Subscriber},
subscriber::{
manager::event::server::{ServerEvent, ServerReadyEvent},
Subscriber,
},
Shared,
};

Expand All @@ -27,6 +30,7 @@ pub struct PluginListener {
/* Events */
server_start: Option<Receiver<Result<ServerEvent>>>,
server_stop: Option<Receiver<Result<ServerEvent>>>,
server_change_ready: Option<Receiver<Result<ServerReadyEvent>>>,
}

impl PluginListener {
Expand All @@ -38,6 +42,7 @@ impl PluginListener {

server_start: None,
server_stop: None,
server_change_ready: None,
}
}

Expand All @@ -62,6 +67,16 @@ impl PluginListener {
.await;
self.server_stop = Some(receiver);
}
if self.events.contains(Events::SERVER_CHANGE_READY) {
let (subscriber, receiver) = Subscriber::create_plugin();
shared
.subscribers
.plugin()
.server_change_ready()
.subscribe(subscriber)
.await;
self.server_change_ready = Some(receiver);
}
}

fn collect_events<T>(event: &mut Option<Receiver<Result<T>>>) -> Vec<T> {
Expand Down Expand Up @@ -119,6 +134,21 @@ impl PluginListener {
.await,
);
}
for event in Self::collect_events(&mut self.server_change_ready) {
let server = event.0.into();
Self::handle_result(
bindings
.plugin_system_event()
.listener()
.call_server_change_ready(
store.as_context_mut(),
self.instance,
&server,
event.1,
)
.await,
);
}
}

pub async fn cleanup(&mut self, store: impl AsContextMut<Data = PluginState>) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion controller/src/application/plugin/runtime/wasm/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::application::{
server::{
guard::Guard, manager::StartRequest, DiskRetention, Resources, Server, Specification,
},
subscriber::manager::event::ServerEvent,
subscriber::manager::event::server::ServerEvent,
};

use super::{
Expand Down
24 changes: 22 additions & 2 deletions controller/src/application/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
fmt::{self, Display, Formatter},
sync::Arc,
time::Duration,
};

Expand All @@ -11,7 +12,7 @@ use uuid::Uuid;

use crate::network::client::TransferMsg;

use super::node::Allocation;
use super::{node::Allocation, Shared};

pub mod guard;
pub mod manager;
Expand Down Expand Up @@ -42,7 +43,7 @@ pub struct Server {
flags: Flags,
#[getset(get = "pub", get_mut = "pub", set = "pub")]
state: State,
#[getset(get = "pub", set = "pub")]
#[getset(get = "pub")]
ready: bool,
}

Expand Down Expand Up @@ -126,6 +127,25 @@ impl Server {
port: u32::from(port.port),
})
}

pub async fn set_ready(&mut self, ready: bool, shared: &Arc<Shared>) {
if self.ready != ready {
self.ready = ready;
// Fire the server change ready event
shared
.subscribers
.plugin()
.server_change_ready()
.publish(((&*self).into(), ready))
.await;
shared
.subscribers
.network()
.ready()
.publish((&*self).into())
.await;
}
}
}

#[derive(Default)]
Expand Down
Loading