diff --git a/ControlCenter/AgentProxy.cs b/ControlCenter/AgentProxy.cs new file mode 100644 index 0000000..19eb1a9 --- /dev/null +++ b/ControlCenter/AgentProxy.cs @@ -0,0 +1,26 @@ + + +// Wrapper to workaround bug in signalr +using Contracts; +using Microsoft.AspNetCore.SignalR; + +public class AgentProxy : IAgent +{ + private readonly ISingleClientProxy _clientProxy; + private AgentProxy(ISingleClientProxy clientProxy) + { + _clientProxy = clientProxy; + } + + public Task GetTemperature() + { + return _clientProxy.InvokeAsync(nameof(GetTemperature)); + } + + public Task Shutdown() + { + return _clientProxy.SendAsync(nameof(Shutdown)); + } + + public static IAgent Create(ISingleClientProxy clientProxy) => new AgentProxy(clientProxy); +} \ No newline at end of file diff --git a/ControlCenter/ControlCenter.csproj b/ControlCenter/ControlCenter.csproj index f7d14ae..93744a9 100644 --- a/ControlCenter/ControlCenter.csproj +++ b/ControlCenter/ControlCenter.csproj @@ -8,6 +8,11 @@ + + + all + runtime; build; native; contentfiles; analyzers + diff --git a/ControlCenter/Data/AgentManager.cs b/ControlCenter/Data/AgentManager.cs index 494edd1..92c9794 100644 --- a/ControlCenter/Data/AgentManager.cs +++ b/ControlCenter/Data/AgentManager.cs @@ -1,7 +1,11 @@ -using Contracts; -using System.Collections.ObjectModel; +using Orleans.Collections; public class AgentManager { - public ObservableCollection<(string, IAgent)> Agents { get; } = new(); -} + public AgentManager(IDistributedCollectionFactory factory) + { + Collection = factory.CreateObservableCollection(0); + } + + public DistributedObservableCollection Collection { get; } +} \ No newline at end of file diff --git a/ControlCenter/Data/CollectionGrain.cs b/ControlCenter/Data/CollectionGrain.cs new file mode 100644 index 0000000..f1ffb65 --- /dev/null +++ b/ControlCenter/Data/CollectionGrain.cs @@ -0,0 +1,44 @@ +using Orleans.Concurrency; + +namespace Orleans.Collections; + +[Reentrant] +public class CollectionGrain : Grain, ICollectionGrain +{ + private readonly List _items = new(); + private readonly List _subs = new(); + + public async Task AddItem(T item) + { + _items.Add(item); + + foreach (var s in _subs) + { + await s.OnCollectionChanged(); + } + } + + public async Task RemoveItem(T item) + { + _items.Remove(item); + + foreach (var s in _subs) + { + await s.OnCollectionChanged(); + } + } + + public Task GetItems() => Task.FromResult(_items.ToArray()); + + public Task Subscribe(ICollectionObserver observer) + { + _subs.Add(observer); + return Task.CompletedTask; + } + + public Task Unsubscribe(ICollectionObserver observer) + { + _subs.Remove(observer); + return Task.CompletedTask; + } +} diff --git a/ControlCenter/Data/DistributedCollectionFactory.cs b/ControlCenter/Data/DistributedCollectionFactory.cs new file mode 100644 index 0000000..54ad1c0 --- /dev/null +++ b/ControlCenter/Data/DistributedCollectionFactory.cs @@ -0,0 +1,16 @@ + +namespace Orleans.Collections; + +public class DistributedCollectionFactory : IDistributedCollectionFactory +{ + private readonly IGrainFactory _grainFactory; + public DistributedCollectionFactory(IGrainFactory grainFactory) + { + _grainFactory = grainFactory; + } + + public DistributedObservableCollection CreateObservableCollection(long key) + { + return new DistributedObservableCollection(_grainFactory, key); + } +} \ No newline at end of file diff --git a/ControlCenter/Data/DistributedObservableCollection.cs b/ControlCenter/Data/DistributedObservableCollection.cs new file mode 100644 index 0000000..a58a179 --- /dev/null +++ b/ControlCenter/Data/DistributedObservableCollection.cs @@ -0,0 +1,76 @@ +namespace Orleans.Collections; + +public class DistributedObservableCollection +{ + private readonly ICollectionGrain _collectionGrain; + private readonly IGrainFactory _grainFactory; + private ICollectionObserver? _collectionObserverReference; + private Func? _changed; + private readonly Observer _observer; + + public DistributedObservableCollection(IGrainFactory grainFactory, long key) + { + _grainFactory = grainFactory; + _collectionGrain = grainFactory.GetGrain>(key); + _observer = new Observer(this); + } + + public Task AddItem(T item) => _collectionGrain.AddItem(item); + public Task RemoveItem(T item) => _collectionGrain.RemoveItem(item); + public Task GetItems() => _collectionGrain.GetItems(); + + private Task OnCollectionChanged() + { + return _changed?.Invoke() ?? Task.CompletedTask; + } + + public async Task SubscribeAsync(Func onChanged) + { + _changed += onChanged; + + if (_collectionObserverReference is null) + { + _collectionObserverReference = await _grainFactory.CreateObjectReference(_observer); + await _collectionGrain.Subscribe(_collectionObserverReference); + } + + return new Disposable(() => + { + _changed -= onChanged; + }); + } + + public async ValueTask DisposeAsync() + { + if (_collectionObserverReference is not null) + { + await _collectionGrain.Unsubscribe(_collectionObserverReference); + } + } + + private class Observer : ICollectionObserver + { + private readonly DistributedObservableCollection _parent; + public Observer(DistributedObservableCollection parent) + { + _parent = parent; + } + + public Task OnCollectionChanged() => _parent.OnCollectionChanged(); + } + + private class Disposable : IDisposable + { + private readonly Action _callback; + public Disposable(Action callback) + { + _callback = callback; + } + + public void Dispose() + { + _callback(); + } + } +} + diff --git a/ControlCenter/Data/ICollectionGrain.cs b/ControlCenter/Data/ICollectionGrain.cs new file mode 100644 index 0000000..dd329fd --- /dev/null +++ b/ControlCenter/Data/ICollectionGrain.cs @@ -0,0 +1,12 @@ +using Orleans; + +namespace Orleans.Collections; + +public interface ICollectionGrain : IGrainWithIntegerKey +{ + Task AddItem(T item); + Task RemoveItem(T item); + Task Subscribe(ICollectionObserver observer); + Task Unsubscribe(ICollectionObserver observer); + Task GetItems(); +} diff --git a/ControlCenter/Data/ICollectionObserver.cs b/ControlCenter/Data/ICollectionObserver.cs new file mode 100644 index 0000000..7b42a94 --- /dev/null +++ b/ControlCenter/Data/ICollectionObserver.cs @@ -0,0 +1,6 @@ +namespace Orleans.Collections; + +public interface ICollectionObserver : IGrainObserver +{ + Task OnCollectionChanged(); +} diff --git a/ControlCenter/Data/IDistributedCollectionFactory.cs b/ControlCenter/Data/IDistributedCollectionFactory.cs new file mode 100644 index 0000000..c178656 --- /dev/null +++ b/ControlCenter/Data/IDistributedCollectionFactory.cs @@ -0,0 +1,7 @@ + +namespace Orleans.Collections; + +public interface IDistributedCollectionFactory +{ + DistributedObservableCollection CreateObservableCollection(long key); +} diff --git a/ControlCenter/Hubs/AgentHub.cs b/ControlCenter/Hubs/AgentHub.cs index 6817720..4b08fd9 100644 --- a/ControlCenter/Hubs/AgentHub.cs +++ b/ControlCenter/Hubs/AgentHub.cs @@ -15,21 +15,14 @@ public AgentHub(AgentManager agentManager) public override Task OnConnectedAsync() { - lock (_agentManager.Agents) - { - _agentManager.Agents.Add((Context.ConnectionId, Clients.Single(Context.ConnectionId))); - } + _agentManager.Collection.AddItem(Context.ConnectionId); return base.OnConnectedAsync(); } public override Task OnDisconnectedAsync(Exception? exception) { - lock (_agentManager.Agents) - { - var item = _agentManager.Agents.FirstOrDefault(a => a.Item1 == Context.ConnectionId); - _agentManager.Agents.Remove(item); - } + _agentManager.Collection.RemoveItem(Context.ConnectionId); return base.OnDisconnectedAsync(exception); } diff --git a/ControlCenter/Pages/Index.razor b/ControlCenter/Pages/Index.razor index 20509de..1af485c 100644 --- a/ControlCenter/Pages/Index.razor +++ b/ControlCenter/Pages/Index.razor @@ -1,6 +1,10 @@ @page "/" @using Contracts +@using ControlCenter.Hubs +@using Microsoft.AspNetCore.SignalR + @inject AgentManager Manager +@inject IHubContext HubContext Agents @@ -15,8 +19,9 @@ - @foreach (var (id, agent) in Manager.Agents) + @foreach (var id in agents) { + var agent = AgentProxy.Create(HubContext.Clients.Single(id)); @id @@ -44,10 +49,20 @@ @code { Dictionary _lastTemperature = new(); + string[] agents = Array.Empty(); - protected override void OnInitialized() + protected override async Task OnInitializedAsync() { - Manager.Agents.CollectionChanged += (_, _) => InvokeAsync(() => StateHasChanged()); + _ = await Manager.Collection.SubscribeAsync(async () => + { + agents = await Manager.Collection.GetItems(); + + await InvokeAsync(() => StateHasChanged()); + }); + + agents = await Manager.Collection.GetItems(); + + await base.OnInitializedAsync(); } async Task CheckTemperature(string id, IAgent agent) diff --git a/ControlCenter/Program.cs b/ControlCenter/Program.cs index 4889b17..47da409 100644 --- a/ControlCenter/Program.cs +++ b/ControlCenter/Program.cs @@ -1,11 +1,16 @@ using ControlCenter.Hubs; +using Orleans; +using Orleans.Collections; +using Orleans.Hosting; var builder = WebApplication.CreateBuilder(args); +builder.Host.UseOrleans(builder => builder.UseLocalhostClustering()); + // Add services to the container. builder.Services.AddRazorPages(); builder.Services.AddServerSideBlazor(); -builder.Services.AddSignalR(o => o.MaximumParallelInvocationsPerClient = 2); +builder.Services.AddSingleton(); builder.Services.AddSingleton(); var app = builder.Build();