add OnlineSyncedPairCacheService

This commit is contained in:
rootdarkarchon
2023-11-03 11:10:49 +01:00
parent 648361f605
commit 3ad4bd8f5e
5 changed files with 203 additions and 7 deletions

View File

@@ -261,17 +261,25 @@ public partial class MareHub
+ string.Join(Environment.NewLine, invalidFileSwapPaths.Select(p => "Invalid FileSwap Path: " + p)));
}
var recipientUids = dto.Recipients.Select(r => r.UID).ToList();
bool allCached = await _onlineSyncedPairCacheService.AreAllPlayersCached(UserUID,
recipientUids, Context.ConnectionAborted).ConfigureAwait(false);
if (!allCached)
{
var allPairedUsers = await GetAllPairedUnpausedUsers().ConfigureAwait(false);
var idents = await GetOnlineUsers(allPairedUsers).ConfigureAwait(false);
var recipients = allPairedUsers.Where(f => dto.Recipients.Select(r => r.UID).Contains(f, StringComparer.Ordinal)).ToList();
recipientUids = allPairedUsers.Where(f => recipientUids.Contains(f, StringComparer.Ordinal)).ToList();
_logger.LogCallInfo(MareHubLogger.Args(idents.Count, recipients.Count()));
await _onlineSyncedPairCacheService.CachePlayers(UserUID, recipientUids, Context.ConnectionAborted).ConfigureAwait(false);
}
await Clients.Users(recipients).Client_UserReceiveCharacterData(new OnlineUserCharaDataDto(new UserData(UserUID), dto.CharaData)).ConfigureAwait(false);
_logger.LogCallInfo(MareHubLogger.Args(recipientUids.Count));
await Clients.Users(recipientUids).Client_UserReceiveCharacterData(new OnlineUserCharaDataDto(new UserData(UserUID), dto.CharaData)).ConfigureAwait(false);
_mareMetrics.IncCounter(MetricsAPI.CounterUserPushData);
_mareMetrics.IncCounter(MetricsAPI.CounterUserPushDataTo, recipients.Count());
_mareMetrics.IncCounter(MetricsAPI.CounterUserPushDataTo, recipientUids.Count);
}
[Authorize(Policy = "Identified")]

View File

@@ -30,13 +30,14 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
private readonly int _maxJoinedGroupsByUser;
private readonly int _maxGroupUserCount;
private readonly IRedisDatabase _redis;
private readonly OnlineSyncedPairCacheService _onlineSyncedPairCacheService;
private readonly Uri _fileServerAddress;
private readonly Version _expectedClientVersion;
public MareHub(MareMetrics mareMetrics,
MareDbContext mareDbContext, ILogger<MareHub> logger, SystemInfoService systemInfoService,
IConfigurationService<ServerConfiguration> configuration, IHttpContextAccessor contextAccessor,
IRedisDatabase redisDb)
IRedisDatabase redisDb, OnlineSyncedPairCacheService onlineSyncedPairCacheService)
{
_mareMetrics = mareMetrics;
_systemInfoService = systemInfoService;
@@ -48,6 +49,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
_expectedClientVersion = configuration.GetValueOrDefault(nameof(ServerConfiguration.ExpectedClientVersion), new Version(0, 0, 0));
_contextAccessor = contextAccessor;
_redis = redisDb;
_onlineSyncedPairCacheService = onlineSyncedPairCacheService;
_logger = new MareHubLogger(this, logger);
_dbContext = mareDbContext;
}

View File

@@ -0,0 +1,173 @@
using MareSynchronosShared.Metrics;
using System.Collections.Concurrent;
namespace MareSynchronosServer.Services;
public class OnlineSyncedPairCacheService : IHostedService
{
private const int CleanupCount = 1;
private const int CacheCount = 500;
private Task? _cleanUpTask;
private readonly CancellationTokenSource _runnerCts = new();
private readonly ConcurrentDictionary<string, Dictionary<string, DateTime>> _lastSeenCache = new(StringComparer.Ordinal);
private readonly SemaphoreSlim _cleanupSemaphore = new(CleanupCount);
private readonly SemaphoreSlim _cacheSemaphore = new(CacheCount);
private readonly SemaphoreSlim _cacheAdditionSemaphore = new(1);
private readonly ILogger<OnlineSyncedPairCacheService> _logger;
private readonly MareMetrics _mareMetrics;
public OnlineSyncedPairCacheService(ILogger<OnlineSyncedPairCacheService> logger, MareMetrics mareMetrics)
{
_logger = logger;
_mareMetrics = mareMetrics;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cleanUpTask = CleanUp(_runnerCts.Token);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_runnerCts.Cancel();
await _cleanUpTask.WaitAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<bool> AreAllPlayersCached(string sender, List<string> uids, CancellationToken ct)
{
while (_cleanupSemaphore.CurrentCount == 0)
await Task.Delay(250, ct).ConfigureAwait(false);
await _cacheSemaphore.WaitAsync(ct).ConfigureAwait(false);
try
{
if (ct.IsCancellationRequested)
return false;
if (!_lastSeenCache.TryGetValue(sender, out var senderCache))
return false;
lock (senderCache)
{
var cachedUIDs = senderCache.Keys.ToList();
var allCached = uids.TrueForAll(u => cachedUIDs.Contains(u, StringComparer.OrdinalIgnoreCase));
_logger.LogDebug("AreAllPlayersCached:{uid}:{count}:{result}", sender, uids.Count, allCached);
if (allCached) _mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheHit);
else _mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheMiss);
return allCached;
}
}
finally
{
_cacheSemaphore.Release();
}
}
public async Task CachePlayers(string sender, List<string> uids, CancellationToken ct)
{
while (_cleanupSemaphore.CurrentCount == 0)
await Task.Delay(250, ct).ConfigureAwait(false);
await _cacheSemaphore.WaitAsync(ct).ConfigureAwait(false);
try
{
if (ct.IsCancellationRequested) return;
if (!_lastSeenCache.TryGetValue(sender, out var senderCache))
{
await _cacheAdditionSemaphore.WaitAsync(ct).ConfigureAwait(false);
try
{
if (!_lastSeenCache.ContainsKey(sender))
{
_lastSeenCache[sender] = senderCache = new(StringComparer.Ordinal);
_mareMetrics.IncGauge(MetricsAPI.GaugeUserPairCacheEntries);
}
}
finally
{
_cacheAdditionSemaphore.Release();
}
}
lock (senderCache)
{
var lastSeen = DateTime.UtcNow.AddMinutes(60);
_logger.LogDebug("CacheOnlinePlayers:{uid}:{count}", sender, uids.Count);
var newEntries = uids.Count(u => !senderCache.ContainsKey(u));
_mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheNewEntries, newEntries);
_mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheUpdatedEntries, uids.Count - newEntries);
_mareMetrics.IncGauge(MetricsAPI.GaugeUserPairCacheEntries, newEntries);
uids.ForEach(u => senderCache[u] = lastSeen);
}
}
finally
{
_cacheSemaphore.Release();
}
}
private async Task CleanUp(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(10), ct).ConfigureAwait(false);
_logger.LogInformation("Cleaning up stale entries");
try
{
await _cleanupSemaphore.WaitAsync(ct).ConfigureAwait(false);
while (_cacheSemaphore.CurrentCount != CacheCount)
await Task.Delay(25, ct).ConfigureAwait(false);
CleanUpCache(ct);
}
finally
{
_cleanupSemaphore.Release();
}
}
}
private void CleanUpCache(CancellationToken ct)
{
try
{
int entriesRemoved = 0;
int playersRemoved = 0;
foreach (var playerCache in _lastSeenCache.ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal))
{
foreach (var cacheEntry in playerCache.Value.ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal))
{
if (cacheEntry.Value < DateTime.UtcNow)
{
entriesRemoved++;
playerCache.Value.Remove(cacheEntry.Key);
}
}
ct.ThrowIfCancellationRequested();
if (!playerCache.Value.Any())
{
playersRemoved++;
_lastSeenCache.Remove(playerCache.Key, out _);
}
}
_logger.LogInformation("Cleaning up complete, removed {entries} individual entries and {players} players", entriesRemoved, playersRemoved);
_mareMetrics.SetGaugeTo(MetricsAPI.GaugeUserPairCacheEntries, _lastSeenCache.Values.SelectMany(k => k.Keys).Count());
_mareMetrics.SetGaugeTo(MetricsAPI.GaugeUserPairCacheUsers, _lastSeenCache.Keys.Count());
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Cleanup failed");
}
}
}

View File

@@ -90,7 +90,9 @@ public class Startup
services.AddSingleton<ServerTokenGenerator>();
services.AddSingleton<SystemInfoService>();
services.AddSingleton<OnlineSyncedPairCacheService>();
services.AddHostedService(provider => provider.GetService<SystemInfoService>());
services.AddHostedService(provider => provider.GetService<OnlineSyncedPairCacheService>());
// configure services based on main server status
ConfigureServicesBasedOnShardType(services, mareConfig, isMainServer);
@@ -273,6 +275,10 @@ public class Startup
MetricsAPI.CounterAuthenticationFailures,
MetricsAPI.CounterAuthenticationRequests,
MetricsAPI.CounterAuthenticationSuccesses,
MetricsAPI.CounterUserPairCacheHit,
MetricsAPI.CounterUserPairCacheMiss,
MetricsAPI.CounterUserPairCacheNewEntries,
MetricsAPI.CounterUserPairCacheUpdatedEntries,
}, new List<string>
{
MetricsAPI.GaugeAuthorizedConnections,
@@ -285,6 +291,7 @@ public class Startup
MetricsAPI.GaugeGroupPairs,
MetricsAPI.GaugeUsersRegistered,
MetricsAPI.GaugeAuthenticationCacheEntries,
MetricsAPI.GaugeUserPairCacheEntries,
}));
}

View File

@@ -33,4 +33,10 @@ public class MetricsAPI
public const string GaugeDownloadQueue = "mare_download_queue";
public const string CounterFileRequests = "mare_files_requests";
public const string CounterFileRequestSize = "mare_files_request_size";
public const string CounterUserPairCacheHit = "mare_pairscache_hit";
public const string CounterUserPairCacheMiss = "mare_pairscache_miss";
public const string GaugeUserPairCacheUsers = "mare_pairscache_users";
public const string GaugeUserPairCacheEntries = "mare_pairscache_entries";
public const string CounterUserPairCacheNewEntries = "mare_pairscache_new_entries";
public const string CounterUserPairCacheUpdatedEntries = "mare_pairscache_updated_entries";
}