diff --git a/MareSynchronosServer/MareSynchronosServer/Program.cs b/MareSynchronosServer/MareSynchronosServer/Program.cs index 76915e0..b2ce044 100644 --- a/MareSynchronosServer/MareSynchronosServer/Program.cs +++ b/MareSynchronosServer/MareSynchronosServer/Program.cs @@ -15,7 +15,8 @@ public class Program using (var scope = host.Services.CreateScope()) { var services = scope.ServiceProvider; - using var context = services.GetRequiredService(); + var factory = services.GetRequiredService>(); + using var context = factory.CreateDbContext(); var options = services.GetRequiredService>(); var logger = host.Services.GetRequiredService>(); @@ -35,12 +36,11 @@ public class Program logger.LogInformation(options.ToString()); } - var metrics = services.GetRequiredService(); metrics.SetGaugeTo(MetricsAPI.GaugeUsersRegistered, context.Users.AsNoTracking().Count()); metrics.SetGaugeTo(MetricsAPI.GaugePairs, context.ClientPairs.AsNoTracking().Count()); - metrics.SetGaugeTo(MetricsAPI.GaugePairsPaused, context.Permissions.AsNoTracking().Count(p => p.IsPaused)); + metrics.SetGaugeTo(MetricsAPI.GaugePairsPaused, context.Permissions.AsNoTracking().Where(p=>p.IsPaused).Count()); } diff --git a/MareSynchronosServer/MareSynchronosServer/Services/CharaDataCleanupService.cs b/MareSynchronosServer/MareSynchronosServer/Services/CharaDataCleanupService.cs index fb0735b..69efc5a 100644 --- a/MareSynchronosServer/MareSynchronosServer/Services/CharaDataCleanupService.cs +++ b/MareSynchronosServer/MareSynchronosServer/Services/CharaDataCleanupService.cs @@ -3,11 +3,10 @@ using Microsoft.EntityFrameworkCore; namespace MareSynchronosServer.Services; -public class CharaDataCleanupService : IHostedService +public class CharaDataCleanupService : BackgroundService { private readonly ILogger _logger; private readonly IDbContextFactory _dbContextFactory; - private readonly CancellationTokenSource _cleanupCts = new(); public CharaDataCleanupService(ILogger logger, IDbContextFactory dbContextFactory) { @@ -15,13 +14,13 @@ public class CharaDataCleanupService : IHostedService _dbContextFactory = dbContextFactory; } - public Task StartAsync(CancellationToken cancellationToken) + public override async Task StartAsync(CancellationToken cancellationToken) { - _ = Cleanup(cancellationToken); - return Task.CompletedTask; + await base.StartAsync(cancellationToken).ConfigureAwait(false); + _logger.LogInformation("Chara Data Cleanup Service started"); } - private async Task Cleanup(CancellationToken ct) + protected override async Task ExecuteAsync(CancellationToken ct) { _logger.LogInformation("CharaData Cleanup Service started"); while (!ct.IsCancellationRequested) @@ -40,11 +39,4 @@ public class CharaDataCleanupService : IHostedService await Task.Delay(TimeSpan.FromHours(12), ct).ConfigureAwait(false); } } - - public Task StopAsync(CancellationToken cancellationToken) - { - _cleanupCts?.Cancel(); - _cleanupCts?.Dispose(); - return Task.CompletedTask; - } } diff --git a/MareSynchronosServer/MareSynchronosServer/Services/ClientPairPermissionsCleanupService.cs b/MareSynchronosServer/MareSynchronosServer/Services/ClientPairPermissionsCleanupService.cs new file mode 100644 index 0000000..83efd2d --- /dev/null +++ b/MareSynchronosServer/MareSynchronosServer/Services/ClientPairPermissionsCleanupService.cs @@ -0,0 +1,165 @@ + +using MareSynchronosShared.Data; +using MareSynchronosShared.Models; +using MareSynchronosShared.Services; +using MareSynchronosShared.Utils.Configuration; +using Microsoft.EntityFrameworkCore; +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace MareSynchronosServer.Services; + +public class ClientPairPermissionsCleanupService(ILogger _logger, IDbContextFactory _dbContextFactory, + IConfigurationService _configurationService) + : BackgroundService +{ + public override async Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("Client Pair Permissions Cleanup Service started"); + await base.StartAsync(cancellationToken).ConfigureAwait(false); + } + + private async Task AllUsersPermissionsCleanup(CancellationToken ct) + { + const int MaxParallelism = 8; + const int MaxProcessingPerChunk = 500000; + + long removedEntries = 0; + long priorRemovedEntries = 0; + ConcurrentDictionary> toRemovePermsParallel = []; + int parallelProcessed = 0; + int userNo = 0; + + using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false); + _logger.LogInformation("Building All Pairs"); + var allPairs = await GetAllPairs(db, ct).ConfigureAwait(false); + _logger.LogInformation("Found a total distinct of {count} pairs", allPairs.Values.Sum(v => v.Count)); + + _logger.LogInformation("Collecting Users"); + var users = (await db.Users.Select(k => k.UID).AsNoTracking().ToListAsync(ct).ConfigureAwait(false)).Order(StringComparer.Ordinal).ToList(); + + Stopwatch st = Stopwatch.StartNew(); + + while (userNo < users.Count) + { + using CancellationTokenSource loopCts = new(); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(loopCts.Token, ct); + try + { + await Parallel.ForAsync(userNo, users.Count, new ParallelOptions() + { + MaxDegreeOfParallelism = MaxParallelism, + CancellationToken = linkedCts.Token + }, + async (i, token) => + { + var userNoInc = Interlocked.Increment(ref userNo); + using var db2 = await _dbContextFactory.CreateDbContextAsync(token).ConfigureAwait(false); + + var user = users[i]; + if (!allPairs.Remove(user, out var personalPairs)) + personalPairs = []; + + toRemovePermsParallel[i] = await UserPermissionCleanup(i, users.Count, user, db2, personalPairs).ConfigureAwait(false); + var processedAdd = Interlocked.Add(ref parallelProcessed, toRemovePermsParallel[i].Count); + + if (userNoInc % 250 == 0) + { + var elapsed = st.Elapsed; + var completion = userNoInc / (double)users.Count; + var estimatedTimeLeft = (elapsed / completion) - elapsed; + _logger.LogInformation("Progress: {no}/{total} ({pct:P2}), removed so far: {removed}, planned next chunk: {planned}, estimated time left: {time}", + userNoInc, users.Count, completion, removedEntries, processedAdd, estimatedTimeLeft); + } + + if (processedAdd > MaxProcessingPerChunk) + await loopCts.CancelAsync().ConfigureAwait(false); + }).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // expected + } + + removedEntries += parallelProcessed; + + _logger.LogInformation("Removing {newDeleted} entities and writing to database", removedEntries - priorRemovedEntries); + db.Permissions.RemoveRange(toRemovePermsParallel.Values.SelectMany(v => v).ToList()); + await db.SaveChangesAsync(ct).ConfigureAwait(false); + + _logger.LogInformation("Removed {newDeleted} entities, settling...", removedEntries - priorRemovedEntries); + priorRemovedEntries = removedEntries; + + parallelProcessed = 0; + toRemovePermsParallel.Clear(); + await Task.Delay(TimeSpan.FromSeconds(5), ct).ConfigureAwait(false); + } + + st.Stop(); + _logger.LogInformation("User Permissions Cleanup Finished, removed {total} stale permissions in {time}", removedEntries, st.Elapsed); + } + + private async Task> UserPermissionCleanup(int userNr, int totalUsers, string uid, MareDbContext dbContext, List pairs) + { + var perms = dbContext.Permissions.Where(p => p.UserUID == uid && !p.Sticky && !pairs.Contains(p.OtherUserUID)); + + var permsToRemoveCount = await perms.CountAsync().ConfigureAwait(false); + if (permsToRemoveCount == 0) + return []; + + _logger.LogInformation("[{current}/{totalCount}] User {user}: Planning to remove {removed} permissions", userNr, totalUsers, uid, permsToRemoveCount); + + return await perms.ToListAsync().ConfigureAwait(false); + } + + private async Task>> GetAllPairs(MareDbContext dbContext, CancellationToken ct) + { + var entries = await dbContext.ClientPairs.AsNoTracking().Select(k => new { Self = k.UserUID, Other = k.OtherUserUID }) + .Concat( + dbContext.GroupPairs.AsNoTracking() + .Join(dbContext.GroupPairs.AsNoTracking(), + a => a.GroupGID, + b => b.GroupGID, + (a, b) => new { Self = a.GroupUserUID, Other = b.GroupUserUID }) + .Where(a => a.Self != a.Other)) + .ToListAsync(ct).ConfigureAwait(false); + + return new(entries.GroupBy(k => k.Self, StringComparer.Ordinal) + .ToDictionary(k => k.Key, k => k.Any() ? k.Select(k => k.Other).Distinct(StringComparer.Ordinal).ToList() : [], StringComparer.Ordinal), StringComparer.Ordinal); + } + + protected override async Task ExecuteAsync(CancellationToken ct) + { + if (!_configurationService.GetValueOrDefault(nameof(ServerConfiguration.RunPermissionCleanupOnStartup), defaultValue: true)) + { + await WaitUntilNextCleanup(ct).ConfigureAwait(false); + } + + while (!ct.IsCancellationRequested) + { + try + { + _logger.LogInformation("Starting Permissions Cleanup"); + await AllUsersPermissionsCleanup(ct).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unhandled Exception during User Permissions Cleanup"); + } + + await WaitUntilNextCleanup(ct).ConfigureAwait(false); + } + } + + private async Task WaitUntilNextCleanup(CancellationToken token) + { + var now = DateTime.UtcNow; + var nextRun = new DateTime(now.Year, now.Month, now.Day, 12, 0, 0, DateTimeKind.Utc); + if (now > nextRun) nextRun = nextRun.AddDays(1); + + var nextRunSpan = nextRun - now; + _logger.LogInformation("Permissions Cleanup next run in {span}", nextRunSpan); + + await Task.Delay(nextRunSpan, token).ConfigureAwait(false); + } +} diff --git a/MareSynchronosServer/MareSynchronosServer/Services/SystemInfoService.cs b/MareSynchronosServer/MareSynchronosServer/Services/SystemInfoService.cs index dc80392..8dc858d 100644 --- a/MareSynchronosServer/MareSynchronosServer/Services/SystemInfoService.cs +++ b/MareSynchronosServer/MareSynchronosServer/Services/SystemInfoService.cs @@ -11,87 +11,74 @@ using StackExchange.Redis.Extensions.Core.Abstractions; namespace MareSynchronosServer.Services; -public sealed class SystemInfoService : IHostedService, IDisposable +public sealed class SystemInfoService : BackgroundService { private readonly MareMetrics _mareMetrics; private readonly IConfigurationService _config; - private readonly IServiceProvider _services; + private readonly IDbContextFactory _dbContextFactory; private readonly ILogger _logger; private readonly IHubContext _hubContext; private readonly IRedisDatabase _redis; - private Timer _timer; public SystemInfoDto SystemInfoDto { get; private set; } = new(); - public SystemInfoService(MareMetrics mareMetrics, IConfigurationService configurationService, IServiceProvider services, + public SystemInfoService(MareMetrics mareMetrics, IConfigurationService configurationService, IDbContextFactory dbContextFactory, ILogger logger, IHubContext hubContext, IRedisDatabase redisDb) { _mareMetrics = mareMetrics; _config = configurationService; - _services = services; + _dbContextFactory = dbContextFactory; _logger = logger; _hubContext = hubContext; _redis = redisDb; } - public Task StartAsync(CancellationToken cancellationToken) + public override async Task StartAsync(CancellationToken cancellationToken) { + await base.StartAsync(cancellationToken).ConfigureAwait(false); _logger.LogInformation("System Info Service started"); - - var timeOut = _config.IsMain ? 5 : 15; - - _timer = new Timer(PushSystemInfo, null, TimeSpan.Zero, TimeSpan.FromSeconds(timeOut)); - - return Task.CompletedTask; } - private void PushSystemInfo(object state) + protected override async Task ExecuteAsync(CancellationToken ct) { - try + var timeOut = _config.IsMain ? 15 : 30; + + while (!ct.IsCancellationRequested) { - ThreadPool.GetAvailableThreads(out int workerThreads, out int ioThreads); - - _mareMetrics.SetGaugeTo(MetricsAPI.GaugeAvailableWorkerThreads, workerThreads); - _mareMetrics.SetGaugeTo(MetricsAPI.GaugeAvailableIOWorkerThreads, ioThreads); - - var onlineUsers = (_redis.SearchKeysAsync("UID:*").GetAwaiter().GetResult()).Count(); - SystemInfoDto = new SystemInfoDto() + try { - OnlineUsers = onlineUsers, - }; + ThreadPool.GetAvailableThreads(out int workerThreads, out int ioThreads); - if (_config.IsMain) + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeAvailableWorkerThreads, workerThreads); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeAvailableIOWorkerThreads, ioThreads); + + var onlineUsers = (_redis.SearchKeysAsync("UID:*").GetAwaiter().GetResult()).Count(); + SystemInfoDto = new SystemInfoDto() + { + OnlineUsers = onlineUsers, + }; + + if (_config.IsMain) + { + _logger.LogInformation("Sending System Info, Online Users: {onlineUsers}", onlineUsers); + + await _hubContext.Clients.All.Client_UpdateSystemInfo(SystemInfoDto).ConfigureAwait(false); + + using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false); + + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, onlineUsers); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugePairs, db.ClientPairs.AsNoTracking().Count()); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugePairsPaused, db.Permissions.AsNoTracking().Where(p => p.IsPaused).Count()); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeGroups, db.Groups.AsNoTracking().Count()); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeGroupPairs, db.GroupPairs.AsNoTracking().Count()); + _mareMetrics.SetGaugeTo(MetricsAPI.GaugeUsersRegistered, db.Users.AsNoTracking().Count()); + } + + await Task.Delay(TimeSpan.FromSeconds(timeOut), ct).ConfigureAwait(false); + } + catch (Exception ex) { - _logger.LogInformation("Sending System Info, Online Users: {onlineUsers}", onlineUsers); - - _hubContext.Clients.All.Client_UpdateSystemInfo(SystemInfoDto); - - using var scope = _services.CreateScope(); - using var db = scope.ServiceProvider.GetService()!; - - _mareMetrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, onlineUsers); - _mareMetrics.SetGaugeTo(MetricsAPI.GaugePairs, db.ClientPairs.AsNoTracking().Count()); - _mareMetrics.SetGaugeTo(MetricsAPI.GaugePairsPaused, db.Permissions.AsNoTracking().Count(p => p.IsPaused)); - _mareMetrics.SetGaugeTo(MetricsAPI.GaugeGroups, db.Groups.AsNoTracking().Count()); - _mareMetrics.SetGaugeTo(MetricsAPI.GaugeGroupPairs, db.GroupPairs.AsNoTracking().Count()); - _mareMetrics.SetGaugeTo(MetricsAPI.GaugeUsersRegistered, db.Users.AsNoTracking().Count()); + _logger.LogWarning(ex, "Failed to push system info"); } } - catch (Exception ex) - { - _logger.LogWarning(ex, "Failed to push system info"); - } - } - - - public Task StopAsync(CancellationToken cancellationToken) - { - _timer?.Change(Timeout.Infinite, 0); - - return Task.CompletedTask; - } - - public void Dispose() - { - _timer?.Dispose(); } } \ No newline at end of file diff --git a/MareSynchronosServer/MareSynchronosServer/Services/UserCleanupService.cs b/MareSynchronosServer/MareSynchronosServer/Services/UserCleanupService.cs index c07ce04..0c02209 100644 --- a/MareSynchronosServer/MareSynchronosServer/Services/UserCleanupService.cs +++ b/MareSynchronosServer/MareSynchronosServer/Services/UserCleanupService.cs @@ -100,8 +100,6 @@ public class UserCleanupService : IHostedService await SharedDbFunctions.PurgeUser(_logger, user, dbContext, maxGroupsByUser).ConfigureAwait(false); } } - - _logger.LogInformation("Cleaning up unauthorized users"); } catch (Exception ex) { diff --git a/MareSynchronosServer/MareSynchronosServer/Startup.cs b/MareSynchronosServer/MareSynchronosServer/Startup.cs index c9006d7..1ddb414 100644 --- a/MareSynchronosServer/MareSynchronosServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosServer/Startup.cs @@ -103,6 +103,7 @@ public class Startup services.AddHostedService(provider => provider.GetService()); services.AddSingleton(); services.AddHostedService(provider => provider.GetService()); + services.AddHostedService(); } services.AddSingleton(); diff --git a/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs b/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs index b87cfd7..90cff2c 100644 --- a/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs +++ b/MareSynchronosServer/MareSynchronosShared/Utils/Configuration/ServerConfiguration.cs @@ -30,6 +30,7 @@ public class ServerConfiguration : MareConfigurationBase [RemoteConfiguration] public int MaxCharaDataByUserVanity { get; set; } = 50; + public bool RunPermissionCleanupOnStartup { get; set; } = true; public override string ToString() { @@ -43,6 +44,7 @@ public class ServerConfiguration : MareConfigurationBase sb.AppendLine($"{nameof(MaxGroupUserCount)} => {MaxGroupUserCount}"); sb.AppendLine($"{nameof(PurgeUnusedAccounts)} => {PurgeUnusedAccounts}"); sb.AppendLine($"{nameof(PurgeUnusedAccountsPeriodInDays)} => {PurgeUnusedAccountsPeriodInDays}"); + sb.AppendLine($"{nameof(RunPermissionCleanupOnStartup)} => {RunPermissionCleanupOnStartup}"); return sb.ToString(); } } \ No newline at end of file