From 8ac0c26de958170f29c5939842720e8faf4a31c6 Mon Sep 17 00:00:00 2001 From: rootdarkarchon Date: Mon, 28 Apr 2025 10:48:17 +0200 Subject: [PATCH] add hub invocation concurrency filter --- .../Hubs/ConcurrencyFilter.cs | 77 +++++++++++++++++++ .../MareSynchronosServer/Startup.cs | 2 + .../Metrics/MetricsAPI.cs | 1 + .../Configuration/ServerConfiguration.cs | 2 + 4 files changed, 82 insertions(+) create mode 100644 MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs new file mode 100644 index 0000000..9f43f95 --- /dev/null +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/ConcurrencyFilter.cs @@ -0,0 +1,77 @@ +using MareSynchronosShared.Metrics; +using MareSynchronosShared.Services; +using MareSynchronosShared.Utils.Configuration; +using Microsoft.AspNetCore.SignalR; + +namespace MareSynchronosServer.Hubs; + +public sealed class ConcurrencyFilter : IHubFilter, IDisposable +{ + private SemaphoreSlim _limiter; + private int _setLimit = 0; + private readonly IConfigurationService _config; + private readonly CancellationTokenSource _cancellationToken; + + private bool _disposed; + + public ConcurrencyFilter(IConfigurationService config, MareMetrics mareMetrics) + { + _config = config; + _config.ConfigChangedEvent += OnConfigChange; + + RecreateSemaphore(); + + _ = Task.Run(async () => + { + var token = _cancellationToken.Token; + while (!token.IsCancellationRequested) + { + mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubConcurrency, _limiter.CurrentCount); + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + } + }); + } + + private void OnConfigChange(object sender, EventArgs e) + { + RecreateSemaphore(); + } + + private void RecreateSemaphore() + { + var newLimit = _config.GetValueOrDefault(nameof(ServerConfiguration.HubExecutionConcurrencyFilter), 50); + if (newLimit != _setLimit) + { + _setLimit = newLimit; + _limiter?.Dispose(); + _limiter = new(initialCount: _setLimit, maxCount: _setLimit); + } + } + + public async ValueTask InvokeMethodAsync( + HubInvocationContext invocationContext, Func> next) + { + await _limiter.WaitAsync(invocationContext.Context.ConnectionAborted).ConfigureAwait(false); + + try + { + return await next(invocationContext).ConfigureAwait(false); + } + finally + { + _limiter.Release(); + } + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + _cancellationToken.Cancel(); + _config.ConfigChangedEvent -= OnConfigChange; + } +} diff --git a/MareSynchronosServer/MareSynchronosServer/Startup.cs b/MareSynchronosServer/MareSynchronosServer/Startup.cs index 1ddb414..b63b382 100644 --- a/MareSynchronosServer/MareSynchronosServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosServer/Startup.cs @@ -113,6 +113,7 @@ public class Startup private static void ConfigureSignalR(IServiceCollection services, IConfigurationSection mareConfig) { services.AddSingleton(); + services.AddSingleton(); var signalRServiceBuilder = services.AddSignalR(hubOptions => { @@ -122,6 +123,7 @@ public class Startup hubOptions.StreamBufferCapacity = 200; hubOptions.AddFilter(); + hubOptions.AddFilter(); }).AddMessagePackProtocol(opt => { var resolver = CompositeResolver.Create(StandardResolverAllowPrivate.Instance, diff --git a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs index 342238e..0e267ba 100644 --- a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs +++ b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs @@ -48,4 +48,5 @@ public class MetricsAPI public const string CounterUserPairCacheUpdatedEntries = "mare_pairscache_updated_entries"; public const string GaugeGposeLobbies = "mare_gpose_lobbies"; public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users"; + public const string GaugeHubConcurrency = "mare_free_concurrent_hub_calls"; } \ No newline at end of file diff --git a/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs b/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs index 90cff2c..f43e0da 100644 --- a/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs +++ b/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs @@ -31,6 +31,7 @@ public class ServerConfiguration : MareConfigurationBase [RemoteConfiguration] public int MaxCharaDataByUserVanity { get; set; } = 50; public bool RunPermissionCleanupOnStartup { get; set; } = true; + public int HubExecutionConcurrencyFilter { get; set; } = 50; public override string ToString() { @@ -45,6 +46,7 @@ public class ServerConfiguration : MareConfigurationBase sb.AppendLine($"{nameof(PurgeUnusedAccounts)} => {PurgeUnusedAccounts}"); sb.AppendLine($"{nameof(PurgeUnusedAccountsPeriodInDays)} => {PurgeUnusedAccountsPeriodInDays}"); sb.AppendLine($"{nameof(RunPermissionCleanupOnStartup)} => {RunPermissionCleanupOnStartup}"); + sb.AppendLine($"{nameof(HubExecutionConcurrencyFilter)} => {HubExecutionConcurrencyFilter}"); return sb.ToString(); } } \ No newline at end of file