diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6026ca7c..6de2b299 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -129,6 +129,7 @@ jobs: cp $(find ./clients/java/paper/build -name "*-all.jar") ac-core.jar cp $(find ./clients/java/paper/send/build -name "*-all.jar") ac-send.jar cp $(find ./clients/java/paper/notify/build -name "*-all.jar") ac-notify.jar + cp $(find ./clients/java/paper/fake-proxy/build -name "*-all.jar") ac-fake-proxy.jar - name: Create egg tar.gz and template tar.gz run: | @@ -158,3 +159,4 @@ jobs: ac-core.jar ac-send.jar ac-notify.jar + ac-fake-proxy.jar diff --git a/clients/java/common/src/main/java/io/atomic/cloud/common/channel/subscription/BytesImpl.java b/clients/java/common/src/main/java/io/atomic/cloud/common/channel/subscription/BytesImpl.java index ba03e027..b5d86f1e 100644 --- a/clients/java/common/src/main/java/io/atomic/cloud/common/channel/subscription/BytesImpl.java +++ b/clients/java/common/src/main/java/io/atomic/cloud/common/channel/subscription/BytesImpl.java @@ -25,7 +25,7 @@ public class BytesImpl implements StreamObserver, Bytes { @Override public void close() { - this.handle.cancel(); + this.handle.cancel("Closed by user"); } @Override diff --git a/clients/java/common/src/main/java/io/atomic/cloud/common/connection/call/CallHandle.java b/clients/java/common/src/main/java/io/atomic/cloud/common/connection/call/CallHandle.java index 697c3be5..993a33b1 100644 --- a/clients/java/common/src/main/java/io/atomic/cloud/common/connection/call/CallHandle.java +++ b/clients/java/common/src/main/java/io/atomic/cloud/common/connection/call/CallHandle.java @@ -14,8 +14,8 @@ public class CallHandle implements ClientResponseObserver stream; - public void cancel() { - stream.cancel(null, null); + public void cancel(String message) { + stream.cancel(message, new InterruptedException(message)); } @Override diff --git a/clients/java/common/src/main/java/io/atomic/cloud/common/connection/client/ClientConnection.java b/clients/java/common/src/main/java/io/atomic/cloud/common/connection/client/ClientConnection.java index 5e7b2ebe..b90db13f 100644 --- a/clients/java/common/src/main/java/io/atomic/cloud/common/connection/client/ClientConnection.java +++ b/clients/java/common/src/main/java/io/atomic/cloud/common/connection/client/ClientConnection.java @@ -174,6 +174,12 @@ public CallHandle subscribeToPowerEvents(StreamObserver subscribeToReadyEvents(StreamObserver observer) { + var handle = new CallHandle<>(observer); + this.client.subscribeToReadyEvents(Empty.getDefaultInstance(), handle); + return handle; + } + @Contract(" -> new") public static @NotNull ClientConnection createFromEnv() { var address = System.getenv("CONTROLLER_ADDRESS"); diff --git a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/NotifyPlugin.java b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/NotifyPlugin.java index 144a9121..8a15e83c 100644 --- a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/NotifyPlugin.java +++ b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/NotifyPlugin.java @@ -1,7 +1,8 @@ package io.atomic.cloud.paper.notify; import io.atomic.cloud.paper.CloudPlugin; -import io.atomic.cloud.paper.notify.notify.PowerHandler; +import io.atomic.cloud.paper.notify.handler.PowerHandler; +import io.atomic.cloud.paper.notify.handler.ReadyHandler; import io.atomic.cloud.paper.notify.setting.message.Messages; import lombok.Getter; import org.bukkit.plugin.java.JavaPlugin; @@ -17,6 +18,7 @@ public class NotifyPlugin extends JavaPlugin { private Messages messages; private PowerHandler powerHandler; + private ReadyHandler readyHandler; @Override public void onLoad() { @@ -25,17 +27,20 @@ public void onLoad() { this.messages = new Messages(this.getConfig()); this.powerHandler = new PowerHandler(CloudPlugin.INSTANCE.clientConnection()); + this.readyHandler = new ReadyHandler(CloudPlugin.INSTANCE.clientConnection()); } @Override public void onEnable() { // Enable the notification system this.powerHandler.enable(); + this.readyHandler.enable(); } @Override public void onDisable() { // Cleanup this.powerHandler.cleanup(); + this.readyHandler.cleanup(); } } diff --git a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/notify/PowerHandler.java b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/handler/PowerHandler.java similarity index 85% rename from clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/notify/PowerHandler.java rename to clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/handler/PowerHandler.java index 90e4c824..f09ba8c6 100644 --- a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/notify/PowerHandler.java +++ b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/handler/PowerHandler.java @@ -1,4 +1,4 @@ -package io.atomic.cloud.paper.notify.notify; +package io.atomic.cloud.paper.notify.handler; import io.atomic.cloud.common.connection.call.CallHandle; import io.atomic.cloud.common.connection.client.ClientConnection; @@ -23,35 +23,35 @@ public void enable() { } public void cleanup() { - this.handle.cancel(); + this.handle.cancel("Closed by cleanup"); } @Override - public void onNext(Notify.PowerEvent powerEvent) { + public void onNext(Notify.PowerEvent event) { try { Bukkit.getOnlinePlayers().stream() .filter(Permissions.POWER_NOTIFY::check) .forEach(player -> { - if (powerEvent.getState() == Notify.PowerEvent.State.START) { + if (event.getState() == Notify.PowerEvent.State.START) { NotifyPlugin.INSTANCE .messages() .serverStarting() .send( player, - Placeholder.unparsed("name", powerEvent.getName()), - Placeholder.unparsed("node", powerEvent.getNode())); + Placeholder.unparsed("name", event.getName()), + Placeholder.unparsed("node", event.getNode())); } else { NotifyPlugin.INSTANCE .messages() .serverStopping() .send( player, - Placeholder.unparsed("name", powerEvent.getName()), - Placeholder.unparsed("node", powerEvent.getNode())); + Placeholder.unparsed("name", event.getName()), + Placeholder.unparsed("node", event.getNode())); } }); } catch (Throwable throwable) { - NotifyPlugin.LOGGER.info("Failed to process power event for server {}:", powerEvent.getName()); + NotifyPlugin.LOGGER.info("Failed to process power event for server {}:", event.getName()); NotifyPlugin.LOGGER.error("-> ", throwable); } } diff --git a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/handler/ReadyHandler.java b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/handler/ReadyHandler.java new file mode 100644 index 00000000..53582e87 --- /dev/null +++ b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/handler/ReadyHandler.java @@ -0,0 +1,60 @@ +package io.atomic.cloud.paper.notify.handler; + +import io.atomic.cloud.common.connection.call.CallHandle; +import io.atomic.cloud.common.connection.client.ClientConnection; +import io.atomic.cloud.grpc.common.Notify; +import io.atomic.cloud.paper.CloudPlugin; +import io.atomic.cloud.paper.notify.NotifyPlugin; +import io.atomic.cloud.paper.notify.permission.Permissions; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import net.kyori.adventure.text.minimessage.tag.resolver.Placeholder; +import org.bukkit.Bukkit; + +@RequiredArgsConstructor +public class ReadyHandler implements StreamObserver { + + private final ClientConnection connection; + private CallHandle handle; + + public void enable() { + NotifyPlugin.LOGGER.info("Enabling ready notifications..."); + this.handle = this.connection.subscribeToReadyEvents(this); + } + + public void cleanup() { + this.handle.cancel("Closed by cleanup"); + } + + @Override + public void onNext(Notify.ReadyEvent event) { + try { + Bukkit.getOnlinePlayers().stream() + .filter(Permissions.READY_NOTIFY::check) + .forEach(player -> { + if (event.getReady()) { + NotifyPlugin.INSTANCE + .messages() + .serverReady() + .send(player, Placeholder.unparsed("name", event.getName())); + } else { + NotifyPlugin.INSTANCE + .messages() + .serverNotReady() + .send(player, Placeholder.unparsed("name", event.getName())); + } + }); + } catch (Throwable throwable) { + NotifyPlugin.LOGGER.info("Failed to process ready event for server {}:", event.getName()); + NotifyPlugin.LOGGER.error("-> ", throwable); + } + } + + @Override + public void onError(Throwable throwable) { + CloudPlugin.LOGGER.error("Failed to handle ready event", throwable); + } + + @Override + public void onCompleted() {} +} diff --git a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/permission/Permissions.java b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/permission/Permissions.java index f9857cbe..4b0b17b5 100644 --- a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/permission/Permissions.java +++ b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/permission/Permissions.java @@ -10,7 +10,8 @@ @Getter @SuppressWarnings("UnstableApiUsage") public enum Permissions { - POWER_NOTIFY("atomic.cloud.power.notify"); + POWER_NOTIFY("atomic.cloud.power.notify"), + READY_NOTIFY("atomic.cloud.ready.notify"); private final String permission; diff --git a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/setting/message/Messages.java b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/setting/message/Messages.java index 13163391..c8896e4c 100644 --- a/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/setting/message/Messages.java +++ b/clients/java/paper/notify/src/main/java/io/atomic/cloud/paper/notify/setting/message/Messages.java @@ -11,8 +11,14 @@ public class Messages { private final Message serverStarting; private final Message serverStopping; + private final Message serverReady; + private final Message serverNotReady; + public Messages(FileConfiguration configuration) { this.serverStarting = new SingleLine("messages.server-starting", configuration); this.serverStopping = new SingleLine("messages.server-stopping", configuration); + + this.serverReady = new SingleLine("messages.server-ready", configuration); + this.serverNotReady = new SingleLine("messages.server-not-ready", configuration); } } diff --git a/clients/java/paper/notify/src/main/resources/config.yml b/clients/java/paper/notify/src/main/resources/config.yml index 60f202eb..2017b3cc 100644 --- a/clients/java/paper/notify/src/main/resources/config.yml +++ b/clients/java/paper/notify/src/main/resources/config.yml @@ -1,3 +1,5 @@ messages: server-starting: "Spinning up server on node !" - server-stopping: "Shutting down server !" \ No newline at end of file + server-stopping: "Shutting down server !" + server-ready: "Server is now ready!" + server-not-ready: "Server is no longer ready!" \ No newline at end of file diff --git a/clients/java/paper/src/main/java/io/atomic/cloud/paper/CloudPlugin.java b/clients/java/paper/src/main/java/io/atomic/cloud/paper/CloudPlugin.java index 8e8af675..e6dec51a 100644 --- a/clients/java/paper/src/main/java/io/atomic/cloud/paper/CloudPlugin.java +++ b/clients/java/paper/src/main/java/io/atomic/cloud/paper/CloudPlugin.java @@ -88,6 +88,7 @@ public void onDisable() { this.transferHandler.cleanup(); try { + this.self.ready(false).join(); if (this.settings.suicideOnDisable()) { this.self.shutdown().thenRun(heart::stop).join(); } else { diff --git a/clients/java/paper/src/main/java/io/atomic/cloud/paper/transfer/TransferHandler.java b/clients/java/paper/src/main/java/io/atomic/cloud/paper/transfer/TransferHandler.java index 592479db..29d791a3 100644 --- a/clients/java/paper/src/main/java/io/atomic/cloud/paper/transfer/TransferHandler.java +++ b/clients/java/paper/src/main/java/io/atomic/cloud/paper/transfer/TransferHandler.java @@ -20,7 +20,7 @@ public void enable() { } public void cleanup() { - this.handle.cancel(); + this.handle.cancel("Closed by cleanup"); } @Override diff --git a/controller/src/application/plugin/runtime/wasm/listener.rs b/controller/src/application/plugin/runtime/wasm/listener.rs index e3bc2112..8e79ed87 100644 --- a/controller/src/application/plugin/runtime/wasm/listener.rs +++ b/controller/src/application/plugin/runtime/wasm/listener.rs @@ -7,7 +7,10 @@ use tokio::sync::{mpsc::Receiver, MutexGuard}; use wasmtime::{component::ResourceAny, AsContextMut, Store}; use crate::application::{ - subscriber::{manager::event::ServerEvent, Subscriber}, + subscriber::{ + manager::event::server::{ServerEvent, ServerReadyEvent}, + Subscriber, + }, Shared, }; @@ -27,6 +30,7 @@ pub struct PluginListener { /* Events */ server_start: Option>>, server_stop: Option>>, + server_change_ready: Option>>, } impl PluginListener { @@ -38,6 +42,7 @@ impl PluginListener { server_start: None, server_stop: None, + server_change_ready: None, } } @@ -62,6 +67,16 @@ impl PluginListener { .await; self.server_stop = Some(receiver); } + if self.events.contains(Events::SERVER_CHANGE_READY) { + let (subscriber, receiver) = Subscriber::create_plugin(); + shared + .subscribers + .plugin() + .server_change_ready() + .subscribe(subscriber) + .await; + self.server_change_ready = Some(receiver); + } } fn collect_events(event: &mut Option>>) -> Vec { @@ -119,6 +134,21 @@ impl PluginListener { .await, ); } + for event in Self::collect_events(&mut self.server_change_ready) { + let server = event.0.into(); + Self::handle_result( + bindings + .plugin_system_event() + .listener() + .call_server_change_ready( + store.as_context_mut(), + self.instance, + &server, + event.1, + ) + .await, + ); + } } pub async fn cleanup(&mut self, store: impl AsContextMut) -> Result<()> { diff --git a/controller/src/application/plugin/runtime/wasm/node.rs b/controller/src/application/plugin/runtime/wasm/node.rs index a53ccd3f..0213de53 100644 --- a/controller/src/application/plugin/runtime/wasm/node.rs +++ b/controller/src/application/plugin/runtime/wasm/node.rs @@ -12,7 +12,7 @@ use crate::application::{ server::{ guard::Guard, manager::StartRequest, DiskRetention, Resources, Server, Specification, }, - subscriber::manager::event::ServerEvent, + subscriber::manager::event::server::ServerEvent, }; use super::{ diff --git a/controller/src/application/server.rs b/controller/src/application/server.rs index de709e86..2104c37d 100644 --- a/controller/src/application/server.rs +++ b/controller/src/application/server.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, fmt::{self, Display, Formatter}, + sync::Arc, time::Duration, }; @@ -11,7 +12,7 @@ use uuid::Uuid; use crate::network::client::TransferMsg; -use super::node::Allocation; +use super::{node::Allocation, Shared}; pub mod guard; pub mod manager; @@ -42,7 +43,7 @@ pub struct Server { flags: Flags, #[getset(get = "pub", get_mut = "pub", set = "pub")] state: State, - #[getset(get = "pub", set = "pub")] + #[getset(get = "pub")] ready: bool, } @@ -126,6 +127,25 @@ impl Server { port: u32::from(port.port), }) } + + pub async fn set_ready(&mut self, ready: bool, shared: &Arc) { + if self.ready != ready { + self.ready = ready; + // Fire the server change ready event + shared + .subscribers + .plugin() + .server_change_ready() + .publish(((&*self).into(), ready)) + .await; + shared + .subscribers + .network() + .ready() + .publish((&*self).into()) + .await; + } + } } #[derive(Default)] diff --git a/controller/src/application/subscriber/manager.rs b/controller/src/application/subscriber/manager.rs index 24c3366a..f5630913 100644 --- a/controller/src/application/subscriber/manager.rs +++ b/controller/src/application/subscriber/manager.rs @@ -1,20 +1,26 @@ use anyhow::Result; -use event::ServerEvent; +use event::server::{ServerEvent, ServerReadyEvent}; use getset::Getters; use uuid::Uuid; -use crate::network::client::{ChannelMsg, PowerMsg, TransferMsg}; +use crate::network::client::{ChannelMsg, PowerMsg, ReadyMsg, TransferMsg}; use super::watcher::Watcher; pub mod event; +#[allow(clippy::struct_field_names)] #[derive(Getters)] pub struct PluginEvents { + /* Power */ #[getset(get = "pub")] server_start: Watcher<(), ServerEvent>, #[getset(get = "pub")] server_stop: Watcher<(), ServerEvent>, + + /* Ready */ + #[getset(get = "pub")] + server_change_ready: Watcher<(), ServerReadyEvent>, } #[derive(Getters)] @@ -25,8 +31,11 @@ pub struct NetworkEvents { #[getset(get = "pub")] channel: Watcher, + /* Server */ #[getset(get = "pub")] power: Watcher<(), PowerMsg>, + #[getset(get = "pub")] + ready: Watcher<(), ReadyMsg>, } #[derive(Getters)] @@ -44,11 +53,13 @@ impl SubscriberManager { plugin: PluginEvents { server_start: Watcher::new(), server_stop: Watcher::new(), + server_change_ready: Watcher::new(), }, network: NetworkEvents { transfer: Watcher::new(), channel: Watcher::new(), power: Watcher::new(), + ready: Watcher::new(), }, } } @@ -61,9 +72,11 @@ impl SubscriberManager { self.network.channel.cleanup().await; self.network.transfer.cleanup().await; self.network.power.cleanup().await; + self.network.ready.cleanup().await; self.plugin.server_start.cleanup().await; self.plugin.server_stop.cleanup().await; + self.plugin.server_change_ready.cleanup().await; Ok(()) } } diff --git a/controller/src/application/subscriber/manager/event.rs b/controller/src/application/subscriber/manager/event.rs index cf4a18c1..74f47ad3 100644 --- a/controller/src/application/subscriber/manager/event.rs +++ b/controller/src/application/subscriber/manager/event.rs @@ -1,29 +1 @@ -use getset::Getters; - -use crate::application::{ - node::Allocation, - server::{NameAndUuid, Server}, -}; - -#[derive(Getters, Clone)] -pub struct ServerEvent { - #[getset(get = "pub")] - pub id: NameAndUuid, - #[getset(get = "pub")] - pub group: Option, - #[getset(get = "pub")] - pub allocation: Allocation, - #[getset(get = "pub")] - pub token: String, -} - -impl From<&Server> for ServerEvent { - fn from(value: &Server) -> Self { - Self { - id: value.id().clone(), - group: value.group().clone(), - allocation: value.allocation().clone(), - token: value.token().clone(), - } - } -} +pub mod server; diff --git a/controller/src/application/subscriber/manager/event/server.rs b/controller/src/application/subscriber/manager/event/server.rs new file mode 100644 index 00000000..1efa4dd9 --- /dev/null +++ b/controller/src/application/subscriber/manager/event/server.rs @@ -0,0 +1,31 @@ +use getset::Getters; + +use crate::application::{ + node::Allocation, + server::{NameAndUuid, Server}, +}; + +pub type ServerReadyEvent = (ServerEvent, bool); + +#[derive(Getters, Clone)] +pub struct ServerEvent { + #[getset(get = "pub")] + pub id: NameAndUuid, + #[getset(get = "pub")] + pub group: Option, + #[getset(get = "pub")] + pub allocation: Allocation, + #[getset(get = "pub")] + pub token: String, +} + +impl From<&Server> for ServerEvent { + fn from(value: &Server) -> Self { + Self { + id: value.id().clone(), + group: value.group().clone(), + allocation: value.allocation().clone(), + token: value.token().clone(), + } + } +} diff --git a/controller/src/network/client.rs b/controller/src/network/client.rs index 78d08138..46665ee7 100644 --- a/controller/src/network/client.rs +++ b/controller/src/network/client.rs @@ -30,7 +30,7 @@ use super::{ transfer::{target::Type, TransferReq, TransferRes}, user::{ConnectedReq, DisconnectedReq}, }, - common::notify::PowerEvent, + common::notify::{PowerEvent, ReadyEvent}, }, }; @@ -45,6 +45,7 @@ mod user; pub type TransferMsg = TransferRes; pub type ChannelMsg = Msg; pub type PowerMsg = PowerEvent; +pub type ReadyMsg = ReadyEvent; pub struct ClientServiceImpl(pub TaskSender, pub Arc); @@ -53,6 +54,7 @@ impl ClientService for ClientServiceImpl { type SubscribeToTransfersStream = ReceiverStream>; type SubscribeToChannelStream = ReceiverStream>; type SubscribeToPowerEventsStream = ReceiverStream>; + type SubscribeToReadyEventsStream = ReceiverStream>; // Heartbeat async fn beat(&self, request: Request<()>) -> Result, Status> { @@ -283,6 +285,15 @@ impl ClientService for ClientServiceImpl { let (sender, receiver) = Subscriber::create_network(); self.1.subscribers.network().power().subscribe(sender).await; + Ok(Response::new(receiver)) + } + async fn subscribe_to_ready_events( + &self, + _request: Request<()>, + ) -> Result, Status> { + let (sender, receiver) = Subscriber::create_network(); + self.1.subscribers.network().ready().subscribe(sender).await; + Ok(Response::new(receiver)) } } diff --git a/controller/src/network/client/notify.rs b/controller/src/network/client/notify.rs index 27ed821e..a07b2f17 100644 --- a/controller/src/network/client/notify.rs +++ b/controller/src/network/client/notify.rs @@ -1,6 +1,6 @@ use crate::{ application::server::{Server, State}, - network::proto::common::notify::{power_event, PowerEvent}, + network::proto::common::notify::{power_event, PowerEvent, ReadyEvent}, }; impl From<&Server> for PowerEvent { @@ -15,3 +15,12 @@ impl From<&Server> for PowerEvent { } } } + +impl From<&Server> for ReadyEvent { + fn from(server: &Server) -> Self { + Self { + ready: *server.ready(), + name: server.id().name().clone(), + } + } +} diff --git a/controller/src/network/client/ready.rs b/controller/src/network/client/ready.rs index 601fb67a..45ef9d32 100644 --- a/controller/src/network/client/ready.rs +++ b/controller/src/network/client/ready.rs @@ -18,7 +18,7 @@ impl GenericTask for SetReadyTask { else { return Task::new_link_error(); }; - server.set_ready(self.1); + server.set_ready(self.1, &controller.shared).await; Task::new_empty() } } diff --git a/controller/src/network/manage.rs b/controller/src/network/manage.rs index 872c39fc..e36c33e9 100644 --- a/controller/src/network/manage.rs +++ b/controller/src/network/manage.rs @@ -19,6 +19,7 @@ use crate::{ group::{ScalingPolicy, StartConstraints}, node::Capabilities, server::{DiskRetention, FallbackPolicy, Resources, Specification}, + subscriber::Subscriber, user::transfer::TransferTarget, Shared, TaskSender, }, @@ -27,7 +28,7 @@ use crate::{ }; use super::proto::{ - common::notify::PowerEvent, + common::notify::{PowerEvent, ReadyEvent}, manage::{ self, manage_service_server::ManageService, @@ -54,6 +55,7 @@ pub struct ManageServiceImpl(pub TaskSender, pub Arc); impl ManageService for ManageServiceImpl { type SubscribeToScreenStream = ReceiverStream>; type SubscribeToPowerEventsStream = ReceiverStream>; + type SubscribeToReadyEventsStream = ReceiverStream>; // Power async fn request_stop(&self, request: Request<()>) -> Result, Status> { @@ -669,6 +671,18 @@ impl ManageService for ManageServiceImpl { &self, _request: Request<()>, ) -> Result, Status> { - Err(Status::unimplemented("Not implemented yet")) + let (sender, receiver) = Subscriber::create_network(); + self.1.subscribers.network().power().subscribe(sender).await; + + Ok(Response::new(receiver)) + } + async fn subscribe_to_ready_events( + &self, + _request: Request<()>, + ) -> Result, Status> { + let (sender, receiver) = Subscriber::create_network(); + self.1.subscribers.network().ready().subscribe(sender).await; + + Ok(Response::new(receiver)) } } diff --git a/plugins/cloudflare/src/listener.rs b/plugins/cloudflare/src/listener.rs index e08dc6aa..58ffb879 100644 --- a/plugins/cloudflare/src/listener.rs +++ b/plugins/cloudflare/src/listener.rs @@ -13,4 +13,8 @@ impl GuestListener for Listener { fn server_stop(&self, _: Server) -> Result<(), ErrorMessage> { Ok(()) } + + fn server_change_ready(&self, _: Server, _: bool) -> Result<(), ErrorMessage> { + Ok(()) + } } diff --git a/plugins/local/src/listener.rs b/plugins/local/src/listener.rs index 7666e1a9..c2a26dec 100644 --- a/plugins/local/src/listener.rs +++ b/plugins/local/src/listener.rs @@ -13,4 +13,8 @@ impl GuestListener for Listener { fn server_stop(&self, _: Server) -> Result<(), ErrorMessage> { unimplemented!() } + + fn server_change_ready(&self, _: Server, _: bool) -> Result<(), ErrorMessage> { + unimplemented!() + } } diff --git a/plugins/pelican/src/listener.rs b/plugins/pelican/src/listener.rs index 7666e1a9..c2a26dec 100644 --- a/plugins/pelican/src/listener.rs +++ b/plugins/pelican/src/listener.rs @@ -13,4 +13,8 @@ impl GuestListener for Listener { fn server_stop(&self, _: Server) -> Result<(), ErrorMessage> { unimplemented!() } + + fn server_change_ready(&self, _: Server, _: bool) -> Result<(), ErrorMessage> { + unimplemented!() + } } diff --git a/protocol/grpc/client/service.proto b/protocol/grpc/client/service.proto index fc309a0f..8353618b 100644 --- a/protocol/grpc/client/service.proto +++ b/protocol/grpc/client/service.proto @@ -50,4 +50,5 @@ service ClientService { // Notify operations rpc SubscribeToPowerEvents(google.protobuf.Empty) returns (stream common.Notify.PowerEvent); + rpc SubscribeToReadyEvents(google.protobuf.Empty) returns (stream common.Notify.ReadyEvent); } \ No newline at end of file diff --git a/protocol/grpc/common/notify.proto b/protocol/grpc/common/notify.proto index df3d810f..380b8bf9 100644 --- a/protocol/grpc/common/notify.proto +++ b/protocol/grpc/common/notify.proto @@ -15,4 +15,8 @@ message Notify { string name = 2; string node = 3; } + message ReadyEvent { + bool ready = 1; + string name = 2; + } } \ No newline at end of file diff --git a/protocol/grpc/manage/service.proto b/protocol/grpc/manage/service.proto index 0d6799af..21eedb6c 100644 --- a/protocol/grpc/manage/service.proto +++ b/protocol/grpc/manage/service.proto @@ -62,4 +62,5 @@ service ManageService { // Notify operations rpc SubscribeToPowerEvents(google.protobuf.Empty) returns (stream common.Notify.PowerEvent); + rpc SubscribeToReadyEvents(google.protobuf.Empty) returns (stream common.Notify.ReadyEvent); } \ No newline at end of file diff --git a/protocol/wit/plugin.wit b/protocol/wit/plugin.wit index 48d57969..46c1d767 100644 --- a/protocol/wit/plugin.wit +++ b/protocol/wit/plugin.wit @@ -199,11 +199,13 @@ interface event { flags events { server-start, server-stop, + server-change-ready, } resource listener { server-start: func(server: server) -> result<_, error-message>; server-stop: func(server: server) -> result<_, error-message>; + server-change-ready: func(server: server, ready: bool) -> result<_, error-message>; } }