some file server fixes I guess

This commit is contained in:
rootdarkarchon
2024-01-13 12:08:49 +01:00
parent 810c6dbd45
commit 2c7ff6f73a
5 changed files with 53 additions and 51 deletions

View File

@@ -31,6 +31,9 @@ public class MetricsAPI
public const string GaugeQueueActive = "mare_download_queue_active"; public const string GaugeQueueActive = "mare_download_queue_active";
public const string GaugeQueueInactive = "mare_download_queue_inactive"; public const string GaugeQueueInactive = "mare_download_queue_inactive";
public const string GaugeDownloadQueue = "mare_download_queue"; public const string GaugeDownloadQueue = "mare_download_queue";
public const string GaugeDownloadQueueCancelled = "mare_download_queue_cancelled";
public const string GaugeDownloadPriorityQueue = "mare_download_priority_queue";
public const string GaugeDownloadPriorityQueueCancelled = "mare_download_priority_queue_cancelled";
public const string CounterFileRequests = "mare_files_requests"; public const string CounterFileRequests = "mare_files_requests";
public const string CounterFileRequestSize = "mare_files_request_size"; public const string CounterFileRequestSize = "mare_files_request_size";
public const string CounterUserPairCacheHit = "mare_pairscache_hit"; public const string CounterUserPairCacheHit = "mare_pairscache_hit";

View File

@@ -24,9 +24,10 @@ public class RequestController : ControllerBase
[Route(MareFiles.Request_Cancel)] [Route(MareFiles.Request_Cancel)]
public async Task<IActionResult> CancelQueueRequest(Guid requestId) public async Task<IActionResult> CancelQueueRequest(Guid requestId)
{ {
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
try try
{ {
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
_requestQueue.RemoveFromQueue(requestId, MareUser); _requestQueue.RemoveFromQueue(requestId, MareUser);
return Ok(); return Ok();
} }
@@ -41,9 +42,10 @@ public class RequestController : ControllerBase
[Route(MareFiles.Request_Enqueue)] [Route(MareFiles.Request_Enqueue)]
public async Task<IActionResult> PreRequestFilesAsync([FromBody] IEnumerable<string> files) public async Task<IActionResult> PreRequestFilesAsync([FromBody] IEnumerable<string> files)
{ {
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
try try
{ {
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
foreach (var file in files) foreach (var file in files)
{ {
_logger.LogDebug("Prerequested file: " + file); _logger.LogDebug("Prerequested file: " + file);
@@ -66,6 +68,8 @@ public class RequestController : ControllerBase
[Route(MareFiles.Request_Check)] [Route(MareFiles.Request_Check)]
public async Task<IActionResult> CheckQueueAsync(Guid requestId, [FromBody] IEnumerable<string> files) public async Task<IActionResult> CheckQueueAsync(Guid requestId, [FromBody] IEnumerable<string> files)
{ {
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
try try
{ {
if (!await _requestQueue.StillEnqueued(requestId, MareUser, _mareDbContext)) if (!await _requestQueue.StillEnqueued(requestId, MareUser, _mareDbContext))
@@ -73,5 +77,9 @@ public class RequestController : ControllerBase
return Ok(); return Ok();
} }
catch (OperationCanceledException) { return BadRequest(); } catch (OperationCanceledException) { return BadRequest(); }
finally
{
_parallelRequestSemaphore.Release();
}
} }
} }

View File

@@ -22,7 +22,6 @@ public class RequestQueueService : IHostedService
private readonly ConcurrentQueue<UserRequest> _priorityQueue = new(); private readonly ConcurrentQueue<UserRequest> _priorityQueue = new();
private readonly int _queueExpirationSeconds; private readonly int _queueExpirationSeconds;
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1); private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
private readonly SemaphoreSlim _queueSemaphore = new(1); private readonly SemaphoreSlim _queueSemaphore = new(1);
private readonly UserQueueEntry[] _userQueueRequests; private readonly UserQueueEntry[] _userQueueRequests;
private readonly ConcurrentDictionary<string, PriorityEntry> _priorityCache = new(StringComparer.Ordinal); private readonly ConcurrentDictionary<string, PriorityEntry> _priorityCache = new(StringComparer.Ordinal);
@@ -118,7 +117,9 @@ public class RequestQueueService : IHostedService
public void RemoveFromQueue(Guid requestId, string user) public void RemoveFromQueue(Guid requestId, string user)
{ {
if (!_queue.Any(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal))) var existingRequest = _priorityQueue.FirstOrDefault(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal))
?? _queue.FirstOrDefault(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal));
if (existingRequest == null)
{ {
var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId); var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId);
if (activeSlot != null) if (activeSlot != null)
@@ -129,10 +130,11 @@ public class RequestQueueService : IHostedService
_userQueueRequests[idx] = null; _userQueueRequests[idx] = null;
} }
} }
return;
} }
_queueRemoval[requestId] = user; else
{
existingRequest.IsCancelled = true;
}
} }
public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
@@ -175,7 +177,7 @@ public class RequestQueueService : IHostedService
try try
{ {
if (_queue.Count > _queueLimitForReset) if (_queue.Count(c => !c.IsCancelled) > _queueLimitForReset)
{ {
_queue.Clear(); _queue.Clear();
return; return;
@@ -189,56 +191,36 @@ public class RequestQueueService : IHostedService
{ {
try try
{ {
if (_userQueueRequests[i] != null && ((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow))) if (_userQueueRequests[i] != null
&& (((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow))
|| (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds))))
)
{ {
_logger.LogDebug("Expiring inactive request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); _logger.LogDebug("Expiring request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
_userQueueRequests[i] = null; _userQueueRequests[i] = null;
} }
if (_userQueueRequests[i] != null && (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds)))) if ((!_queue.Any() && !_priorityQueue.Any()) || _userQueueRequests[i] != null) return;
{
_logger.LogDebug("Expiring active request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
_userQueueRequests[i] = null;
}
if (!_queue.Any() && !_priorityQueue.Any()) return; while (true)
if (_userQueueRequests[i] == null)
{ {
bool enqueued = false; if (_priorityQueue.TryDequeue(out var prioRequest))
while (!enqueued)
{ {
if (_priorityQueue.TryDequeue(out var prioRequest)) if (prioRequest.IsCancelled) continue;
{
if (_queueRemoval.TryGetValue(prioRequest.RequestId, out string user) && string.Equals(user, prioRequest.User, StringComparison.Ordinal))
{
_logger.LogDebug("Request cancelled: {requestId} by {user}", prioRequest.RequestId, user);
_queueRemoval.Remove(prioRequest.RequestId, out _);
continue;
}
await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false); await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false);
enqueued = true; break;
break;
}
if (_queue.TryDequeue(out var request))
{
if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal))
{
_logger.LogDebug("Request cancelled: {requestId} by {user}", request.RequestId, user);
_queueRemoval.Remove(request.RequestId, out _);
continue;
}
await DequeueIntoSlotAsync(request, i).ConfigureAwait(false);
enqueued = true;
}
else
{
enqueued = true;
}
} }
if (_queue.TryDequeue(out var request))
{
if (request.IsCancelled) continue;
await DequeueIntoSlotAsync(request, i).ConfigureAwait(false);
break;
}
break;
} }
} }
catch (Exception ex) catch (Exception ex)
@@ -259,6 +241,9 @@ public class RequestQueueService : IHostedService
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null));
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive));
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive));
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count); _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count(q => !q.IsCancelled));
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueueCancelled, _queue.Count(q => q.IsCancelled));
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueue, _priorityQueue.Count(q => !q.IsCancelled));
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueueCancelled, _priorityQueue.Count(q => q.IsCancelled));
} }
} }

View File

@@ -65,6 +65,9 @@ public class Startup
MetricsAPI.GaugeFilesUniquePastHourSize, MetricsAPI.GaugeFilesUniquePastHourSize,
MetricsAPI.GaugeCurrentDownloads, MetricsAPI.GaugeCurrentDownloads,
MetricsAPI.GaugeDownloadQueue, MetricsAPI.GaugeDownloadQueue,
MetricsAPI.GaugeDownloadQueueCancelled,
MetricsAPI.GaugeDownloadPriorityQueue,
MetricsAPI.GaugeDownloadPriorityQueueCancelled,
MetricsAPI.GaugeQueueFree, MetricsAPI.GaugeQueueFree,
MetricsAPI.GaugeQueueInactive, MetricsAPI.GaugeQueueInactive,
MetricsAPI.GaugeQueueActive, MetricsAPI.GaugeQueueActive,

View File

@@ -1,3 +1,6 @@
namespace MareSynchronosStaticFilesServer.Utils; namespace MareSynchronosStaticFilesServer.Utils;
public record UserRequest(Guid RequestId, string User, List<string> FileIds); public record UserRequest(Guid RequestId, string User, List<string> FileIds)
{
public bool IsCancelled { get; set; } = false;
}