diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs index 35d7d41..85dd67b 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs @@ -2,12 +2,13 @@ using MareSynchronosShared.Services; using MareSynchronosShared.Utils.Configuration; using Microsoft.AspNetCore.SignalR; +using System.Threading.RateLimiting; namespace MareSynchronosServer.Hubs; public sealed class ConcurrencyFilter : IHubFilter, IDisposable { - private SemaphoreSlim _limiter; + private ConcurrencyLimiter _limiter; private int _setLimit = 0; private readonly IConfigurationService _config; private readonly CancellationTokenSource _cts = new(); @@ -19,14 +20,19 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable _config = config; _config.ConfigChangedEvent += OnConfigChange; - RecreateSemaphore(); + RecreateLimiter(); _ = Task.Run(async () => { var token = _cts.Token; while (!token.IsCancellationRequested) { - mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubConcurrency, _limiter?.CurrentCount ?? 0); + var stats = _limiter?.GetStatistics(); + if (stats != null) + { + mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubConcurrency, stats.CurrentAvailablePermits); + mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubQueuedConcurrency, stats.CurrentQueuedCount); + } await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } }); @@ -34,18 +40,26 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable private void OnConfigChange(object sender, EventArgs e) { - RecreateSemaphore(); + RecreateLimiter(); } - private void RecreateSemaphore() + private void RecreateLimiter() { var newLimit = _config.GetValueOrDefault(nameof(ServerConfiguration.HubExecutionConcurrencyFilter), 50); - if (newLimit != _setLimit) + + if (newLimit == _setLimit && _limiter is not null) { - _setLimit = newLimit; - _limiter?.Dispose(); - _limiter = new(initialCount: _setLimit, maxCount: _setLimit); + return; } + + _setLimit = newLimit; + _limiter?.Dispose(); + _limiter = new(new ConcurrencyLimiterOptions() + { + PermitLimit = newLimit, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = newLimit * 100, + }); } public async ValueTask InvokeMethodAsync( @@ -56,15 +70,25 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable return await next(invocationContext).ConfigureAwait(false); } - await _limiter.WaitAsync(invocationContext.Context.ConnectionAborted).ConfigureAwait(false); - + var ct = invocationContext.Context.ConnectionAborted; + RateLimitLease lease; try { - return await next(invocationContext).ConfigureAwait(false); + lease = await _limiter.AcquireAsync(1, ct).ConfigureAwait(false); } - finally + catch (OperationCanceledException) when (ct.IsCancellationRequested) { - _limiter.Release(); + throw; + } + + if (!lease.IsAcquired) + { + throw new HubException("Concurrency limit exceeded. Try again later."); + } + + using (lease) + { + return await next(invocationContext).ConfigureAwait(false); } } @@ -77,6 +101,8 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable _disposed = true; _cts.Cancel(); + _limiter?.Dispose(); _config.ConfigChangedEvent -= OnConfigChange; + _cts.Dispose(); } } diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/SignalRLimitFilter.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/SignalRLimitFilter.cs index a2e8017..c5462c4 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/SignalRLimitFilter.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/SignalRLimitFilter.cs @@ -48,7 +48,7 @@ public class SignalRLimitFilter : IHubFilter } // Optional method - public async Task OnConnectedAsync(HubLifetimeContext context, Func next) + /* public async Task OnConnectedAsync(HubLifetimeContext context, Func next) { await ConnectionLimiterSemaphore.WaitAsync().ConfigureAwait(false); try @@ -108,5 +108,5 @@ public class SignalRLimitFilter : IHubFilter { DisconnectLimiterSemaphore.Release(); } - } + } */ } diff --git a/MareSynchronosServer/MareSynchronosServer/Startup.cs b/MareSynchronosServer/MareSynchronosServer/Startup.cs index d7cd560..5023ad7 100644 --- a/MareSynchronosServer/MareSynchronosServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosServer/Startup.cs @@ -122,7 +122,7 @@ public class Startup hubOptions.MaximumParallelInvocationsPerClient = 10; hubOptions.StreamBufferCapacity = 200; - hubOptions.AddFilter(); + //hubOptions.AddFilter(); hubOptions.AddFilter(); }).AddMessagePackProtocol(opt => { diff --git a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs index 0e267ba..d5be998 100644 --- a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs +++ b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs @@ -49,4 +49,5 @@ public class MetricsAPI public const string GaugeGposeLobbies = "mare_gpose_lobbies"; public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users"; public const string GaugeHubConcurrency = "mare_free_concurrent_hub_calls"; + public const string GaugeHubQueuedConcurrency = "mare_free_concurrent_queued_hub_calls"; } \ No newline at end of file