let file shards register against main or so

This commit is contained in:
Stanley Dimant
2024-11-16 23:02:04 +01:00
parent 532c123927
commit 3a9f93b157
12 changed files with 457 additions and 48 deletions

View File

@@ -1,4 +1,5 @@
using MareSynchronos.API.Routes;
using MareSynchronosShared.Utils.Configuration;
using MareSynchronosStaticFilesServer.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
@@ -6,20 +7,60 @@ using Microsoft.AspNetCore.Mvc;
namespace MareSynchronosStaticFilesServer.Controllers;
[Route(MareFiles.Main)]
[Authorize(Policy = "Internal")]
public class MainController : ControllerBase
{
private readonly IClientReadyMessageService _messageService;
private readonly MainServerShardRegistrationService _shardRegistrationService;
public MainController(ILogger<MainController> logger, IClientReadyMessageService mareHub) : base(logger)
public MainController(ILogger<MainController> logger, IClientReadyMessageService mareHub,
MainServerShardRegistrationService shardRegistrationService) : base(logger)
{
_messageService = mareHub;
_shardRegistrationService = shardRegistrationService;
}
[HttpGet(MareFiles.Main_SendReady)]
[Authorize(Policy = "Internal")]
public async Task<IActionResult> SendReadyToClients(string uid, Guid requestId)
{
await _messageService.SendDownloadReady(uid, requestId).ConfigureAwait(false);
return Ok();
}
[HttpPost("shardRegister")]
public IActionResult RegisterShard([FromBody] ShardConfiguration shardConfiguration)
{
try
{
_shardRegistrationService.RegisterShard(MareUser, shardConfiguration);
return Ok();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Shard could not be registered {shard}", MareUser);
return BadRequest();
}
}
[HttpPost("shardUnregister")]
public IActionResult UnregisterShard()
{
_shardRegistrationService.UnregisterShard(MareUser);
return Ok();
}
[HttpPost("shardHeartbeat")]
public IActionResult ShardHeartbeat()
{
try
{
_shardRegistrationService.ShardHeartbeat(MareUser);
return Ok();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Shard not registered: {shard}", MareUser);
return BadRequest();
}
}
}

View File

@@ -31,11 +31,13 @@ public class ServerFilesController : ControllerBase
private readonly IHubContext<MareHub> _hubContext;
private readonly IDbContextFactory<MareDbContext> _mareDbContext;
private readonly MareMetrics _metricsClient;
private readonly MainServerShardRegistrationService _shardRegistrationService;
public ServerFilesController(ILogger<ServerFilesController> logger, CachedFileProvider cachedFileProvider,
IConfigurationService<StaticFilesServerConfiguration> configuration,
IHubContext<MareHub> hubContext,
IDbContextFactory<MareDbContext> mareDbContext, MareMetrics metricsClient) : base(logger)
IDbContextFactory<MareDbContext> mareDbContext, MareMetrics metricsClient,
MainServerShardRegistrationService shardRegistrationService) : base(logger)
{
_basePath = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false)
? configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.ColdStorageDirectory))
@@ -45,6 +47,7 @@ public class ServerFilesController : ControllerBase
_hubContext = hubContext;
_mareDbContext = mareDbContext;
_metricsClient = metricsClient;
_shardRegistrationService = shardRegistrationService;
}
[HttpPost(MareFiles.ServerFiles_DeleteAll)]
@@ -85,7 +88,7 @@ public class ServerFilesController : ControllerBase
.Select(k => new { k.Hash, k.Size, k.RawSize })
.ToListAsync().ConfigureAwait(false);
var allFileShards = new List<CdnShardConfiguration>(_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CdnShardConfiguration), new List<CdnShardConfiguration>()));
var allFileShards = _shardRegistrationService.GetConfigurationsByContinent(Continent);
foreach (var file in cacheFile)
{
@@ -94,25 +97,12 @@ public class ServerFilesController : ControllerBase
if (forbiddenFile == null)
{
List<CdnShardConfiguration> selectedShards = new();
var matchingShards = allFileShards.Where(f => new Regex(f.FileMatch).IsMatch(file.Hash)).ToList();
if (string.Equals(Continent, "*", StringComparison.Ordinal))
{
selectedShards = matchingShards;
}
else
{
selectedShards = matchingShards.Where(c => c.Continents.Contains(Continent, StringComparer.OrdinalIgnoreCase)).ToList();
if (!selectedShards.Any()) selectedShards = matchingShards;
}
var shard = matchingShards.SelectMany(g => g.RegionUris)
.OrderBy(g => Guid.NewGuid()).FirstOrDefault();
var shard = selectedShards
.OrderBy(s => !s.Continents.Any() ? 0 : 1)
.ThenBy(s => s.Continents.Contains("*", StringComparer.Ordinal) ? 0 : 1)
.ThenBy(g => Guid.NewGuid()).FirstOrDefault();
baseUrl = shard?.CdnFullUrl ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
baseUrl = shard.Value ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
}
response.Add(new DownloadFileDto
@@ -133,15 +123,8 @@ public class ServerFilesController : ControllerBase
[HttpGet(MareFiles.ServerFiles_DownloadServers)]
public async Task<IActionResult> GetDownloadServers()
{
var allFileShards = new List<CdnShardConfiguration>(_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CdnShardConfiguration), new List<CdnShardConfiguration>()))
.DistinctBy(f => f.CdnFullUrl).ToList();
if (!allFileShards.Any())
{
return Ok(JsonSerializer.Serialize(new List<string> { _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl)).ToString() }));
}
var selectedShards = allFileShards.Where(c => c.Continents.Contains(Continent, StringComparer.OrdinalIgnoreCase)).ToList();
if (!selectedShards.Any()) selectedShards = allFileShards.Where(c => c.Continents.Contains("*", StringComparer.Ordinal)).ToList();
return Ok(JsonSerializer.Serialize(selectedShards.Select(t => t.CdnFullUrl.ToString())));
var allFileShards = _shardRegistrationService.GetConfigurationsByContinent(Continent);
return Ok(JsonSerializer.Serialize(allFileShards.SelectMany(t => t.RegionUris.Select(v => v.Value.ToString()))));
}
[HttpPost(MareFiles.ServerFiles_FilesSend)]

View File

@@ -0,0 +1,96 @@
using MareSynchronosShared.Services;
using MareSynchronosShared.Utils.Configuration;
using System.Collections.Concurrent;
using System.Collections.Frozen;
namespace MareSynchronosStaticFilesServer.Services;
public class MainServerShardRegistrationService : IHostedService
{
private readonly ILogger<MainServerShardRegistrationService> _logger;
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
private readonly ConcurrentDictionary<string, ShardConfiguration> _shardConfigs = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, DateTime> _shardHeartbeats = new(StringComparer.Ordinal);
private readonly CancellationTokenSource _periodicCheckCts = new();
public MainServerShardRegistrationService(ILogger<MainServerShardRegistrationService> logger,
IConfigurationService<StaticFilesServerConfiguration> configurationService)
{
_logger = logger;
_configurationService = configurationService;
}
public void RegisterShard(string shardName, ShardConfiguration shardConfiguration)
{
if (shardConfiguration == null || shardConfiguration == default)
throw new InvalidOperationException("Empty configuration provided");
if (_shardConfigs.ContainsKey(shardName))
_logger.LogInformation("Re-Registering Shard {name}", shardName);
else
_logger.LogInformation("Registering Shard {name}", shardName);
_shardHeartbeats[shardName] = DateTime.UtcNow;
_shardConfigs[shardName] = shardConfiguration;
}
public void UnregisterShard(string shardName)
{
_logger.LogInformation("Unregistering Shard {name}", shardName);
_shardHeartbeats.TryRemove(shardName, out _);
_shardConfigs.TryRemove(shardName, out _);
}
public List<ShardConfiguration> GetConfigurationsByContinent(string continent)
{
var shardConfigs = _shardConfigs.Values.Where(v => v.Continents.Contains(continent, StringComparer.OrdinalIgnoreCase)).ToList();
if (shardConfigs.Any()) return shardConfigs;
shardConfigs = _shardConfigs.Values.Where(v => v.Continents.Contains("*", StringComparer.OrdinalIgnoreCase)).ToList();
if (shardConfigs.Any()) return shardConfigs;
return [new ShardConfiguration() {
Continents = ["*"],
FileMatch = ".*",
RegionUris = new(StringComparer.Ordinal) {
{ "Central", _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl)) }
} }];
}
public void ShardHeartbeat(string shardName)
{
if (!_shardConfigs.ContainsKey(shardName))
throw new InvalidOperationException("Shard not registered");
_logger.LogInformation("Heartbeat from {name}", shardName);
_shardHeartbeats[shardName] = DateTime.UtcNow;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_ = Task.Run(() => PeriodicHeartbeatCleanup(_periodicCheckCts.Token), cancellationToken).ConfigureAwait(false);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _periodicCheckCts.CancelAsync().ConfigureAwait(false);
_periodicCheckCts.Dispose();
}
private async Task PeriodicHeartbeatCleanup(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
foreach (var kvp in _shardHeartbeats.ToFrozenDictionary())
{
if (DateTime.UtcNow.Subtract(kvp.Value) > TimeSpan.FromMinutes(1))
{
_shardHeartbeats.TryRemove(kvp.Key, out _);
_shardConfigs.TryRemove(kvp.Key, out _);
}
}
await Task.Delay(5000, ct).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,107 @@
using MareSynchronos.API.Routes;
using MareSynchronosShared.Services;
using MareSynchronosShared.Utils;
using MareSynchronosShared.Utils.Configuration;
namespace MareSynchronosStaticFilesServer.Services;
public class ShardRegistrationService : IHostedService
{
private readonly ILogger<ShardRegistrationService> _logger;
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
private readonly HttpClient _httpClient = new();
private readonly CancellationTokenSource _heartBeatCts = new();
private bool _isRegistered = false;
public ShardRegistrationService(ILogger<ShardRegistrationService> logger,
IConfigurationService<StaticFilesServerConfiguration> configurationService,
ServerTokenGenerator serverTokenGenerator)
{
_logger = logger;
_configurationService = configurationService;
_httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", serverTokenGenerator.Token);
}
private void OnConfigChanged(object sender, EventArgs e)
{
_isRegistered = false;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Starting");
_configurationService.ConfigChangedEvent += OnConfigChanged;
_ = Task.Run(() => HeartbeatLoop(_heartBeatCts.Token));
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping");
_configurationService.ConfigChangedEvent -= OnConfigChanged;
_heartBeatCts.Cancel();
_heartBeatCts.Dispose();
// call unregister
await UnregisterShard().ConfigureAwait(false);
_httpClient.Dispose();
}
private async Task HeartbeatLoop(CancellationToken ct)
{
while (!_heartBeatCts.IsCancellationRequested)
{
try
{
await ProcessHeartbeat(ct).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Issue during Heartbeat");
_isRegistered = false;
}
await Task.Delay(TimeSpan.FromSeconds(30), ct).ConfigureAwait(false);
}
}
private async Task ProcessHeartbeat(CancellationToken ct)
{
if (!_isRegistered)
{
await TryRegisterShard(ct).ConfigureAwait(false);
}
await ShardHeartbeat(ct).ConfigureAwait(false);
}
private async Task ShardHeartbeat(CancellationToken ct)
{
Uri mainServer = _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress));
_logger.LogInformation("Running heartbeat against Main {server}", mainServer);
using var heartBeat = await _httpClient.PostAsync(new Uri(mainServer, MareFiles.Main + "/shardHeartbeat"), null, ct).ConfigureAwait(false);
heartBeat.EnsureSuccessStatusCode();
}
private async Task TryRegisterShard(CancellationToken ct)
{
Uri mainServer = _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress));
_logger.LogInformation("Registering Shard with Main {server}", mainServer);
var config = _configurationService.GetValue<ShardConfiguration>(nameof(StaticFilesServerConfiguration.ShardConfiguration));
_logger.LogInformation("Config Value {varName}: {value}", nameof(ShardConfiguration.Continents), string.Join(", ", config.Continents));
_logger.LogInformation("Config Value {varName}: {value}", nameof(ShardConfiguration.FileMatch), config.FileMatch);
_logger.LogInformation("Config Value {varName}: {value}", nameof(ShardConfiguration.RegionUris), string.Join("; ", config.RegionUris.Select(k => k.Key + ":" + k.Value)));
using var register = await _httpClient.PostAsJsonAsync(new Uri(mainServer, MareFiles.Main + "/shardRegister"), config, ct).ConfigureAwait(false);
register.EnsureSuccessStatusCode();
_isRegistered = true;
}
private async Task UnregisterShard()
{
Uri mainServer = _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress));
_logger.LogInformation("Unregistering Shard with Main {server}", mainServer);
using var heartBeat = await _httpClient.PostAsync(new Uri(mainServer, MareFiles.Main + "/shardUnregister"), null).ConfigureAwait(false);
}
}

View File

@@ -98,6 +98,8 @@ public class Startup
services.AddSingleton<IClientReadyMessageService, MainClientReadyMessageService>();
services.AddHostedService<MainFileCleanupService>();
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceServer<StaticFilesServerConfiguration>>();
services.AddSingleton<MainServerShardRegistrationService>();
services.AddHostedService(s => s.GetRequiredService<MainServerShardRegistrationService>());
services.AddDbContextPool<MareDbContext>(options =>
{
options.UseNpgsql(Configuration.GetConnectionString("DefaultConnection"), builder =>
@@ -180,6 +182,8 @@ public class Startup
}
else
{
services.AddSingleton<ShardRegistrationService>();
services.AddHostedService(s => s.GetRequiredService<ShardRegistrationService>());
services.AddSingleton<IClientReadyMessageService, ShardClientReadyMessageService>();
services.AddHostedService<ShardFileCleanupService>();
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceClient<StaticFilesServerConfiguration>>();