add hub invocation concurrency filter
This commit is contained in:
@@ -0,0 +1,77 @@
|
|||||||
|
using MareSynchronosShared.Metrics;
|
||||||
|
using MareSynchronosShared.Services;
|
||||||
|
using MareSynchronosShared.Utils.Configuration;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
|
||||||
|
namespace MareSynchronosServer.Hubs;
|
||||||
|
|
||||||
|
public sealed class ConcurrencyFilter : IHubFilter, IDisposable
|
||||||
|
{
|
||||||
|
private SemaphoreSlim _limiter;
|
||||||
|
private int _setLimit = 0;
|
||||||
|
private readonly IConfigurationService<ServerConfiguration> _config;
|
||||||
|
private readonly CancellationTokenSource _cancellationToken;
|
||||||
|
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public ConcurrencyFilter(IConfigurationService<ServerConfiguration> config, MareMetrics mareMetrics)
|
||||||
|
{
|
||||||
|
_config = config;
|
||||||
|
_config.ConfigChangedEvent += OnConfigChange;
|
||||||
|
|
||||||
|
RecreateSemaphore();
|
||||||
|
|
||||||
|
_ = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
var token = _cancellationToken.Token;
|
||||||
|
while (!token.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
mareMetrics.SetGaugeTo(MetricsAPI.GaugeHubConcurrency, _limiter.CurrentCount);
|
||||||
|
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void OnConfigChange(object sender, EventArgs e)
|
||||||
|
{
|
||||||
|
RecreateSemaphore();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void RecreateSemaphore()
|
||||||
|
{
|
||||||
|
var newLimit = _config.GetValueOrDefault(nameof(ServerConfiguration.HubExecutionConcurrencyFilter), 50);
|
||||||
|
if (newLimit != _setLimit)
|
||||||
|
{
|
||||||
|
_setLimit = newLimit;
|
||||||
|
_limiter?.Dispose();
|
||||||
|
_limiter = new(initialCount: _setLimit, maxCount: _setLimit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask<object> InvokeMethodAsync(
|
||||||
|
HubInvocationContext invocationContext, Func<HubInvocationContext, ValueTask<object>> next)
|
||||||
|
{
|
||||||
|
await _limiter.WaitAsync(invocationContext.Context.ConnectionAborted).ConfigureAwait(false);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return await next(invocationContext).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_limiter.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_disposed = true;
|
||||||
|
_cancellationToken.Cancel();
|
||||||
|
_config.ConfigChangedEvent -= OnConfigChange;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -113,6 +113,7 @@ public class Startup
|
|||||||
private static void ConfigureSignalR(IServiceCollection services, IConfigurationSection mareConfig)
|
private static void ConfigureSignalR(IServiceCollection services, IConfigurationSection mareConfig)
|
||||||
{
|
{
|
||||||
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
|
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
|
||||||
|
services.AddSingleton<ConcurrencyFilter>();
|
||||||
|
|
||||||
var signalRServiceBuilder = services.AddSignalR(hubOptions =>
|
var signalRServiceBuilder = services.AddSignalR(hubOptions =>
|
||||||
{
|
{
|
||||||
@@ -122,6 +123,7 @@ public class Startup
|
|||||||
hubOptions.StreamBufferCapacity = 200;
|
hubOptions.StreamBufferCapacity = 200;
|
||||||
|
|
||||||
hubOptions.AddFilter<SignalRLimitFilter>();
|
hubOptions.AddFilter<SignalRLimitFilter>();
|
||||||
|
hubOptions.AddFilter<ConcurrencyFilter>();
|
||||||
}).AddMessagePackProtocol(opt =>
|
}).AddMessagePackProtocol(opt =>
|
||||||
{
|
{
|
||||||
var resolver = CompositeResolver.Create(StandardResolverAllowPrivate.Instance,
|
var resolver = CompositeResolver.Create(StandardResolverAllowPrivate.Instance,
|
||||||
|
|||||||
@@ -48,4 +48,5 @@ public class MetricsAPI
|
|||||||
public const string CounterUserPairCacheUpdatedEntries = "mare_pairscache_updated_entries";
|
public const string CounterUserPairCacheUpdatedEntries = "mare_pairscache_updated_entries";
|
||||||
public const string GaugeGposeLobbies = "mare_gpose_lobbies";
|
public const string GaugeGposeLobbies = "mare_gpose_lobbies";
|
||||||
public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users";
|
public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users";
|
||||||
|
public const string GaugeHubConcurrency = "mare_free_concurrent_hub_calls";
|
||||||
}
|
}
|
||||||
@@ -31,6 +31,7 @@ public class ServerConfiguration : MareConfigurationBase
|
|||||||
[RemoteConfiguration]
|
[RemoteConfiguration]
|
||||||
public int MaxCharaDataByUserVanity { get; set; } = 50;
|
public int MaxCharaDataByUserVanity { get; set; } = 50;
|
||||||
public bool RunPermissionCleanupOnStartup { get; set; } = true;
|
public bool RunPermissionCleanupOnStartup { get; set; } = true;
|
||||||
|
public int HubExecutionConcurrencyFilter { get; set; } = 50;
|
||||||
|
|
||||||
public override string ToString()
|
public override string ToString()
|
||||||
{
|
{
|
||||||
@@ -45,6 +46,7 @@ public class ServerConfiguration : MareConfigurationBase
|
|||||||
sb.AppendLine($"{nameof(PurgeUnusedAccounts)} => {PurgeUnusedAccounts}");
|
sb.AppendLine($"{nameof(PurgeUnusedAccounts)} => {PurgeUnusedAccounts}");
|
||||||
sb.AppendLine($"{nameof(PurgeUnusedAccountsPeriodInDays)} => {PurgeUnusedAccountsPeriodInDays}");
|
sb.AppendLine($"{nameof(PurgeUnusedAccountsPeriodInDays)} => {PurgeUnusedAccountsPeriodInDays}");
|
||||||
sb.AppendLine($"{nameof(RunPermissionCleanupOnStartup)} => {RunPermissionCleanupOnStartup}");
|
sb.AppendLine($"{nameof(RunPermissionCleanupOnStartup)} => {RunPermissionCleanupOnStartup}");
|
||||||
|
sb.AppendLine($"{nameof(HubExecutionConcurrencyFilter)} => {HubExecutionConcurrencyFilter}");
|
||||||
return sb.ToString();
|
return sb.ToString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user