diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs index 51098a5..1f2c8db 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs @@ -261,17 +261,25 @@ public partial class MareHub + string.Join(Environment.NewLine, invalidFileSwapPaths.Select(p => "Invalid FileSwap Path: " + p))); } - var allPairedUsers = await GetAllPairedUnpausedUsers().ConfigureAwait(false); - var idents = await GetOnlineUsers(allPairedUsers).ConfigureAwait(false); + var recipientUids = dto.Recipients.Select(r => r.UID).ToList(); + bool allCached = await _onlineSyncedPairCacheService.AreAllPlayersCached(UserUID, + recipientUids, Context.ConnectionAborted).ConfigureAwait(false); - var recipients = allPairedUsers.Where(f => dto.Recipients.Select(r => r.UID).Contains(f, StringComparer.Ordinal)).ToList(); + if (!allCached) + { + var allPairedUsers = await GetAllPairedUnpausedUsers().ConfigureAwait(false); - _logger.LogCallInfo(MareHubLogger.Args(idents.Count, recipients.Count())); + recipientUids = allPairedUsers.Where(f => recipientUids.Contains(f, StringComparer.Ordinal)).ToList(); - await Clients.Users(recipients).Client_UserReceiveCharacterData(new OnlineUserCharaDataDto(new UserData(UserUID), dto.CharaData)).ConfigureAwait(false); + await _onlineSyncedPairCacheService.CachePlayers(UserUID, recipientUids, Context.ConnectionAborted).ConfigureAwait(false); + } + + _logger.LogCallInfo(MareHubLogger.Args(recipientUids.Count)); + + await Clients.Users(recipientUids).Client_UserReceiveCharacterData(new OnlineUserCharaDataDto(new UserData(UserUID), dto.CharaData)).ConfigureAwait(false); _mareMetrics.IncCounter(MetricsAPI.CounterUserPushData); - _mareMetrics.IncCounter(MetricsAPI.CounterUserPushDataTo, recipients.Count()); + _mareMetrics.IncCounter(MetricsAPI.CounterUserPushDataTo, recipientUids.Count); } [Authorize(Policy = "Identified")] diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs index f91950f..94ac8c4 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs @@ -30,13 +30,14 @@ public partial class MareHub : Hub, IMareHub private readonly int _maxJoinedGroupsByUser; private readonly int _maxGroupUserCount; private readonly IRedisDatabase _redis; + private readonly OnlineSyncedPairCacheService _onlineSyncedPairCacheService; private readonly Uri _fileServerAddress; private readonly Version _expectedClientVersion; public MareHub(MareMetrics mareMetrics, MareDbContext mareDbContext, ILogger logger, SystemInfoService systemInfoService, IConfigurationService configuration, IHttpContextAccessor contextAccessor, - IRedisDatabase redisDb) + IRedisDatabase redisDb, OnlineSyncedPairCacheService onlineSyncedPairCacheService) { _mareMetrics = mareMetrics; _systemInfoService = systemInfoService; @@ -48,6 +49,7 @@ public partial class MareHub : Hub, IMareHub _expectedClientVersion = configuration.GetValueOrDefault(nameof(ServerConfiguration.ExpectedClientVersion), new Version(0, 0, 0)); _contextAccessor = contextAccessor; _redis = redisDb; + _onlineSyncedPairCacheService = onlineSyncedPairCacheService; _logger = new MareHubLogger(this, logger); _dbContext = mareDbContext; } diff --git a/MareSynchronosServer/MareSynchronosServer/Services/OnlineSyncedPairCacheService.cs b/MareSynchronosServer/MareSynchronosServer/Services/OnlineSyncedPairCacheService.cs new file mode 100644 index 0000000..f8730cd --- /dev/null +++ b/MareSynchronosServer/MareSynchronosServer/Services/OnlineSyncedPairCacheService.cs @@ -0,0 +1,173 @@ +using MareSynchronosShared.Metrics; +using System.Collections.Concurrent; + +namespace MareSynchronosServer.Services; + +public class OnlineSyncedPairCacheService : IHostedService +{ + private const int CleanupCount = 1; + private const int CacheCount = 500; + private Task? _cleanUpTask; + private readonly CancellationTokenSource _runnerCts = new(); + private readonly ConcurrentDictionary> _lastSeenCache = new(StringComparer.Ordinal); + private readonly SemaphoreSlim _cleanupSemaphore = new(CleanupCount); + private readonly SemaphoreSlim _cacheSemaphore = new(CacheCount); + private readonly SemaphoreSlim _cacheAdditionSemaphore = new(1); + private readonly ILogger _logger; + private readonly MareMetrics _mareMetrics; + + public OnlineSyncedPairCacheService(ILogger logger, MareMetrics mareMetrics) + { + _logger = logger; + _mareMetrics = mareMetrics; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _cleanUpTask = CleanUp(_runnerCts.Token); + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + _runnerCts.Cancel(); + await _cleanUpTask.WaitAsync(cancellationToken).ConfigureAwait(false); + } + + public async Task AreAllPlayersCached(string sender, List uids, CancellationToken ct) + { + while (_cleanupSemaphore.CurrentCount == 0) + await Task.Delay(250, ct).ConfigureAwait(false); + + await _cacheSemaphore.WaitAsync(ct).ConfigureAwait(false); + + try + { + if (ct.IsCancellationRequested) + return false; + if (!_lastSeenCache.TryGetValue(sender, out var senderCache)) + return false; + + lock (senderCache) + { + var cachedUIDs = senderCache.Keys.ToList(); + var allCached = uids.TrueForAll(u => cachedUIDs.Contains(u, StringComparer.OrdinalIgnoreCase)); + + _logger.LogDebug("AreAllPlayersCached:{uid}:{count}:{result}", sender, uids.Count, allCached); + + if (allCached) _mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheHit); + else _mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheMiss); + + return allCached; + } + } + finally + { + _cacheSemaphore.Release(); + } + } + + public async Task CachePlayers(string sender, List uids, CancellationToken ct) + { + while (_cleanupSemaphore.CurrentCount == 0) + await Task.Delay(250, ct).ConfigureAwait(false); + + await _cacheSemaphore.WaitAsync(ct).ConfigureAwait(false); + + try + { + if (ct.IsCancellationRequested) return; + if (!_lastSeenCache.TryGetValue(sender, out var senderCache)) + { + await _cacheAdditionSemaphore.WaitAsync(ct).ConfigureAwait(false); + try + { + if (!_lastSeenCache.ContainsKey(sender)) + { + _lastSeenCache[sender] = senderCache = new(StringComparer.Ordinal); + _mareMetrics.IncGauge(MetricsAPI.GaugeUserPairCacheEntries); + } + } + finally + { + _cacheAdditionSemaphore.Release(); + } + } + + lock (senderCache) + { + var lastSeen = DateTime.UtcNow.AddMinutes(60); + _logger.LogDebug("CacheOnlinePlayers:{uid}:{count}", sender, uids.Count); + var newEntries = uids.Count(u => !senderCache.ContainsKey(u)); + + _mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheNewEntries, newEntries); + _mareMetrics.IncCounter(MetricsAPI.CounterUserPairCacheUpdatedEntries, uids.Count - newEntries); + + _mareMetrics.IncGauge(MetricsAPI.GaugeUserPairCacheEntries, newEntries); + uids.ForEach(u => senderCache[u] = lastSeen); + } + } + finally + { + _cacheSemaphore.Release(); + } + } + + private async Task CleanUp(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromSeconds(10), ct).ConfigureAwait(false); + + _logger.LogInformation("Cleaning up stale entries"); + + try + { + await _cleanupSemaphore.WaitAsync(ct).ConfigureAwait(false); + while (_cacheSemaphore.CurrentCount != CacheCount) + await Task.Delay(25, ct).ConfigureAwait(false); + CleanUpCache(ct); + } + finally + { + _cleanupSemaphore.Release(); + } + } + } + + private void CleanUpCache(CancellationToken ct) + { + try + { + int entriesRemoved = 0; + int playersRemoved = 0; + foreach (var playerCache in _lastSeenCache.ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal)) + { + foreach (var cacheEntry in playerCache.Value.ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal)) + { + if (cacheEntry.Value < DateTime.UtcNow) + { + entriesRemoved++; + playerCache.Value.Remove(cacheEntry.Key); + } + } + + ct.ThrowIfCancellationRequested(); + + if (!playerCache.Value.Any()) + { + playersRemoved++; + _lastSeenCache.Remove(playerCache.Key, out _); + } + } + + _logger.LogInformation("Cleaning up complete, removed {entries} individual entries and {players} players", entriesRemoved, playersRemoved); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeUserPairCacheEntries, _lastSeenCache.Values.SelectMany(k => k.Keys).Count()); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeUserPairCacheUsers, _lastSeenCache.Keys.Count()); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Cleanup failed"); + } + } +} diff --git a/MareSynchronosServer/MareSynchronosServer/Startup.cs b/MareSynchronosServer/MareSynchronosServer/Startup.cs index c7add52..7ba7b75 100644 --- a/MareSynchronosServer/MareSynchronosServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosServer/Startup.cs @@ -90,7 +90,9 @@ public class Startup services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddHostedService(provider => provider.GetService()); + services.AddHostedService(provider => provider.GetService()); // configure services based on main server status ConfigureServicesBasedOnShardType(services, mareConfig, isMainServer); @@ -273,6 +275,10 @@ public class Startup MetricsAPI.CounterAuthenticationFailures, MetricsAPI.CounterAuthenticationRequests, MetricsAPI.CounterAuthenticationSuccesses, + MetricsAPI.CounterUserPairCacheHit, + MetricsAPI.CounterUserPairCacheMiss, + MetricsAPI.CounterUserPairCacheNewEntries, + MetricsAPI.CounterUserPairCacheUpdatedEntries, }, new List { MetricsAPI.GaugeAuthorizedConnections, @@ -285,6 +291,7 @@ public class Startup MetricsAPI.GaugeGroupPairs, MetricsAPI.GaugeUsersRegistered, MetricsAPI.GaugeAuthenticationCacheEntries, + MetricsAPI.GaugeUserPairCacheEntries, })); } diff --git a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs index 1e99676..87838d0 100644 --- a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs +++ b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs @@ -33,4 +33,10 @@ public class MetricsAPI public const string GaugeDownloadQueue = "mare_download_queue"; public const string CounterFileRequests = "mare_files_requests"; public const string CounterFileRequestSize = "mare_files_request_size"; + public const string CounterUserPairCacheHit = "mare_pairscache_hit"; + public const string CounterUserPairCacheMiss = "mare_pairscache_miss"; + public const string GaugeUserPairCacheUsers = "mare_pairscache_users"; + public const string GaugeUserPairCacheEntries = "mare_pairscache_entries"; + public const string CounterUserPairCacheNewEntries = "mare_pairscache_new_entries"; + public const string CounterUserPairCacheUpdatedEntries = "mare_pairscache_updated_entries"; } \ No newline at end of file