rework concurrencyfilter

This commit is contained in:
rootdarkarchon
2025-08-20 10:14:58 +02:00
parent 282fb4f83a
commit c664ecbe26
4 changed files with 44 additions and 17 deletions

View File

@@ -2,12 +2,13 @@
using MareSynchronosShared.Services; using MareSynchronosShared.Services;
using MareSynchronosShared.Utils.Configuration; using MareSynchronosShared.Utils.Configuration;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using System.Threading.RateLimiting;
namespace MareSynchronosServer.Hubs; namespace MareSynchronosServer.Hubs;
public sealed class ConcurrencyFilter : IHubFilter, IDisposable public sealed class ConcurrencyFilter : IHubFilter, IDisposable
{ {
private SemaphoreSlim _limiter; private ConcurrencyLimiter _limiter;
private int _setLimit = 0; private int _setLimit = 0;
private readonly IConfigurationService<ServerConfiguration> _config; private readonly IConfigurationService<ServerConfiguration> _config;
private readonly CancellationTokenSource _cts = new(); private readonly CancellationTokenSource _cts = new();
@@ -19,14 +20,19 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable
_config = config; _config = config;
_config.ConfigChangedEvent += OnConfigChange; _config.ConfigChangedEvent += OnConfigChange;
RecreateSemaphore(); RecreateLimiter();
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
var token = _cts.Token; var token = _cts.Token;
while (!token.IsCancellationRequested) while (!token.IsCancellationRequested)
{ {
mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubConcurrency, _limiter?.CurrentCount ?? 0); var stats = _limiter?.GetStatistics();
if (stats != null)
{
mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubConcurrency, stats.CurrentAvailablePermits);
mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubQueuedConcurrency, stats.CurrentQueuedCount);
}
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
} }
}); });
@@ -34,18 +40,26 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable
private void OnConfigChange(object sender, EventArgs e) private void OnConfigChange(object sender, EventArgs e)
{ {
RecreateSemaphore(); RecreateLimiter();
} }
private void RecreateSemaphore() private void RecreateLimiter()
{ {
var newLimit = _config.GetValueOrDefault(nameof(ServerConfiguration.HubExecutionConcurrencyFilter), 50); var newLimit = _config.GetValueOrDefault(nameof(ServerConfiguration.HubExecutionConcurrencyFilter), 50);
if (newLimit != _setLimit)
if (newLimit == _setLimit && _limiter is not null)
{ {
return;
}
_setLimit = newLimit; _setLimit = newLimit;
_limiter?.Dispose(); _limiter?.Dispose();
_limiter = new(initialCount: _setLimit, maxCount: _setLimit); _limiter = new(new ConcurrencyLimiterOptions()
} {
PermitLimit = newLimit,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = newLimit * 100,
});
} }
public async ValueTask<object> InvokeMethodAsync( public async ValueTask<object> InvokeMethodAsync(
@@ -56,15 +70,25 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable
return await next(invocationContext).ConfigureAwait(false); return await next(invocationContext).ConfigureAwait(false);
} }
await _limiter.WaitAsync(invocationContext.Context.ConnectionAborted).ConfigureAwait(false); var ct = invocationContext.Context.ConnectionAborted;
RateLimitLease lease;
try try
{ {
return await next(invocationContext).ConfigureAwait(false); lease = await _limiter.AcquireAsync(1, ct).ConfigureAwait(false);
} }
finally catch (OperationCanceledException) when (ct.IsCancellationRequested)
{ {
_limiter.Release(); throw;
}
if (!lease.IsAcquired)
{
throw new HubException("Concurrency limit exceeded. Try again later.");
}
using (lease)
{
return await next(invocationContext).ConfigureAwait(false);
} }
} }
@@ -77,6 +101,8 @@ public sealed class ConcurrencyFilter : IHubFilter, IDisposable
_disposed = true; _disposed = true;
_cts.Cancel(); _cts.Cancel();
_limiter?.Dispose();
_config.ConfigChangedEvent -= OnConfigChange; _config.ConfigChangedEvent -= OnConfigChange;
_cts.Dispose();
} }
} }

View File

@@ -48,7 +48,7 @@ public class SignalRLimitFilter : IHubFilter
} }
// Optional method // Optional method
public async Task OnConnectedAsync(HubLifetimeContext context, Func<HubLifetimeContext, Task> next) /* public async Task OnConnectedAsync(HubLifetimeContext context, Func<HubLifetimeContext, Task> next)
{ {
await ConnectionLimiterSemaphore.WaitAsync().ConfigureAwait(false); await ConnectionLimiterSemaphore.WaitAsync().ConfigureAwait(false);
try try
@@ -108,5 +108,5 @@ public class SignalRLimitFilter : IHubFilter
{ {
DisconnectLimiterSemaphore.Release(); DisconnectLimiterSemaphore.Release();
} }
} } */
} }

View File

@@ -122,7 +122,7 @@ public class Startup
hubOptions.MaximumParallelInvocationsPerClient = 10; hubOptions.MaximumParallelInvocationsPerClient = 10;
hubOptions.StreamBufferCapacity = 200; hubOptions.StreamBufferCapacity = 200;
hubOptions.AddFilter<SignalRLimitFilter>(); //hubOptions.AddFilter<SignalRLimitFilter>();
hubOptions.AddFilter<ConcurrencyFilter>(); hubOptions.AddFilter<ConcurrencyFilter>();
}).AddMessagePackProtocol(opt => }).AddMessagePackProtocol(opt =>
{ {

View File

@@ -49,4 +49,5 @@ public class MetricsAPI
public const string GaugeGposeLobbies = "mare_gpose_lobbies"; public const string GaugeGposeLobbies = "mare_gpose_lobbies";
public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users"; public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users";
public const string GaugeHubConcurrency = "mare_free_concurrent_hub_calls"; public const string GaugeHubConcurrency = "mare_free_concurrent_hub_calls";
public const string GaugeHubQueuedConcurrency = "mare_free_concurrent_queued_hub_calls";
} }