add some more logging and such
This commit is contained in:
@@ -177,31 +177,13 @@ public class ServerFilesController : ControllerBase
|
|||||||
{
|
{
|
||||||
using var dbContext = await _mareDbContext.CreateDbContextAsync();
|
using var dbContext = await _mareDbContext.CreateDbContextAsync();
|
||||||
|
|
||||||
_logger.LogInformation("{user} uploading file {file}", MareUser, hash);
|
_logger.LogInformation("{user}|{file}: Uploading", MareUser, hash);
|
||||||
|
|
||||||
hash = hash.ToUpperInvariant();
|
hash = hash.ToUpperInvariant();
|
||||||
var existingFile = await dbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
var existingFile = await dbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
||||||
if (existingFile != null) return Ok();
|
if (existingFile != null) return Ok();
|
||||||
|
|
||||||
SemaphoreSlim? fileLock = null;
|
SemaphoreSlim fileLock = await CreateFileLock(hash, requestAborted).ConfigureAwait(false);
|
||||||
bool successfullyWaited = false;
|
|
||||||
while (!successfullyWaited && !requestAborted.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
lock (_fileUploadLocks)
|
|
||||||
{
|
|
||||||
if (!_fileUploadLocks.TryGetValue(hash, out fileLock))
|
|
||||||
_fileUploadLocks[hash] = fileLock = new SemaphoreSlim(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
|
|
||||||
successfullyWaited = true;
|
|
||||||
}
|
|
||||||
catch (ObjectDisposedException)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("Semaphore disposed for {hash}, recreating", hash);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -212,58 +194,26 @@ public class ServerFilesController : ControllerBase
|
|||||||
}
|
}
|
||||||
|
|
||||||
// copy the request body to memory
|
// copy the request body to memory
|
||||||
using var compressedFileStream = new MemoryStream();
|
using var memoryStream = new MemoryStream();
|
||||||
await Request.Body.CopyToAsync(compressedFileStream, requestAborted).ConfigureAwait(false);
|
await Request.Body.CopyToAsync(memoryStream, requestAborted).ConfigureAwait(false);
|
||||||
|
|
||||||
// decompress and copy the decompressed stream to memory
|
_logger.LogDebug("{user}|{file}: Finished uploading", MareUser, hash);
|
||||||
var data = LZ4Wrapper.Unwrap(compressedFileStream.ToArray());
|
|
||||||
|
|
||||||
// reset streams
|
await StoreData(hash, dbContext, memoryStream).ConfigureAwait(false);
|
||||||
compressedFileStream.Seek(0, SeekOrigin.Begin);
|
|
||||||
|
|
||||||
// compute hash to verify
|
|
||||||
var hashString = BitConverter.ToString(SHA1.HashData(data))
|
|
||||||
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
|
|
||||||
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
|
|
||||||
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
|
|
||||||
|
|
||||||
// save file
|
|
||||||
var path = FilePathUtil.GetFilePath(_basePath, hash);
|
|
||||||
using var fileStream = new FileStream(path, FileMode.Create);
|
|
||||||
await compressedFileStream.CopyToAsync(fileStream).ConfigureAwait(false);
|
|
||||||
|
|
||||||
// update on db
|
|
||||||
await dbContext.Files.AddAsync(new FileCache()
|
|
||||||
{
|
|
||||||
Hash = hash,
|
|
||||||
UploadDate = DateTime.UtcNow,
|
|
||||||
UploaderUID = MareUser,
|
|
||||||
Size = compressedFileStream.Length,
|
|
||||||
Uploaded = true,
|
|
||||||
RawSize = data.LongLength
|
|
||||||
}).ConfigureAwait(false);
|
|
||||||
await dbContext.SaveChangesAsync().ConfigureAwait(false);
|
|
||||||
|
|
||||||
bool isColdStorage = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
|
||||||
|
|
||||||
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalColdStorage : MetricsAPI.GaugeFilesTotal, 1);
|
|
||||||
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalSizeColdStorage : MetricsAPI.GaugeFilesTotalSize, compressedFileStream.Length);
|
|
||||||
|
|
||||||
_fileUploadLocks.TryRemove(hash, out _);
|
|
||||||
|
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
_logger.LogError(e, "Error during file upload");
|
_logger.LogError(e, "{user}|{file}: Error during file upload", MareUser, hash);
|
||||||
return BadRequest();
|
return BadRequest();
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
fileLock?.Release();
|
fileLock.Release();
|
||||||
fileLock?.Dispose();
|
fileLock.Dispose();
|
||||||
}
|
}
|
||||||
catch (ObjectDisposedException)
|
catch (ObjectDisposedException)
|
||||||
{
|
{
|
||||||
@@ -282,31 +232,12 @@ public class ServerFilesController : ControllerBase
|
|||||||
{
|
{
|
||||||
using var dbContext = await _mareDbContext.CreateDbContextAsync();
|
using var dbContext = await _mareDbContext.CreateDbContextAsync();
|
||||||
|
|
||||||
_logger.LogInformation("{user} uploading munged file {file}", MareUser, hash);
|
_logger.LogInformation("{user}|{file}: Uploading munged", MareUser, hash);
|
||||||
hash = hash.ToUpperInvariant();
|
hash = hash.ToUpperInvariant();
|
||||||
var existingFile = await dbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
var existingFile = await dbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
||||||
if (existingFile != null) return Ok();
|
if (existingFile != null) return Ok();
|
||||||
|
|
||||||
SemaphoreSlim? fileLock = null;
|
SemaphoreSlim fileLock = await CreateFileLock(hash, requestAborted).ConfigureAwait(false);
|
||||||
bool successfullyWaited = false;
|
|
||||||
while (!successfullyWaited && !requestAborted.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
lock (_fileUploadLocks)
|
|
||||||
{
|
|
||||||
if (!_fileUploadLocks.TryGetValue(hash, out fileLock))
|
|
||||||
_fileUploadLocks[hash] = fileLock = new SemaphoreSlim(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
|
|
||||||
successfullyWaited = true;
|
|
||||||
}
|
|
||||||
catch (ObjectDisposedException)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("Semaphore disposed for {hash}, recreating", hash);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -321,51 +252,25 @@ public class ServerFilesController : ControllerBase
|
|||||||
await Request.Body.CopyToAsync(compressedMungedStream, requestAborted).ConfigureAwait(false);
|
await Request.Body.CopyToAsync(compressedMungedStream, requestAborted).ConfigureAwait(false);
|
||||||
var unmungedFile = compressedMungedStream.ToArray();
|
var unmungedFile = compressedMungedStream.ToArray();
|
||||||
MungeBuffer(unmungedFile.AsSpan());
|
MungeBuffer(unmungedFile.AsSpan());
|
||||||
|
await using MemoryStream unmungedMs = new(unmungedFile);
|
||||||
|
|
||||||
// decompress and copy the decompressed stream to memory
|
_logger.LogDebug("{user}|{file}: Finished uploading, unmunged stream", MareUser, hash);
|
||||||
var data = LZ4Wrapper.Unwrap(unmungedFile);
|
|
||||||
|
|
||||||
// compute hash to verify
|
await StoreData(hash, dbContext, unmungedMs);
|
||||||
var hashString = BitConverter.ToString(SHA1.HashData(data))
|
|
||||||
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
|
|
||||||
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
|
|
||||||
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
|
|
||||||
|
|
||||||
// save file
|
|
||||||
var path = FilePathUtil.GetFilePath(_basePath, hash);
|
|
||||||
using var fileStream = new FileStream(path, FileMode.Create);
|
|
||||||
await fileStream.WriteAsync(unmungedFile.AsMemory()).ConfigureAwait(false);
|
|
||||||
|
|
||||||
// update on db
|
|
||||||
await dbContext.Files.AddAsync(new FileCache()
|
|
||||||
{
|
|
||||||
Hash = hash,
|
|
||||||
UploadDate = DateTime.UtcNow,
|
|
||||||
UploaderUID = MareUser,
|
|
||||||
Size = compressedMungedStream.Length,
|
|
||||||
Uploaded = true,
|
|
||||||
RawSize = data.LongLength
|
|
||||||
}).ConfigureAwait(false);
|
|
||||||
await dbContext.SaveChangesAsync().ConfigureAwait(false);
|
|
||||||
|
|
||||||
bool isColdStorage = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
|
||||||
|
|
||||||
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalColdStorage : MetricsAPI.GaugeFilesTotal, 1);
|
|
||||||
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalSizeColdStorage : MetricsAPI.GaugeFilesTotalSize, compressedMungedStream.Length);
|
|
||||||
|
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
_logger.LogError(e, "Error during file upload");
|
_logger.LogError(e, "{user}|{file}: Error during file upload", MareUser, hash);
|
||||||
return BadRequest();
|
return BadRequest();
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
fileLock?.Release();
|
fileLock.Release();
|
||||||
fileLock?.Dispose();
|
fileLock.Dispose();
|
||||||
}
|
}
|
||||||
catch (ObjectDisposedException)
|
catch (ObjectDisposedException)
|
||||||
{
|
{
|
||||||
@@ -378,6 +283,76 @@ public class ServerFilesController : ControllerBase
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async Task StoreData(string hash, MareDbContext dbContext, MemoryStream compressedFileStream)
|
||||||
|
{
|
||||||
|
var decompressedData = LZ4Wrapper.Unwrap(compressedFileStream.ToArray());
|
||||||
|
// reset streams
|
||||||
|
compressedFileStream.Seek(0, SeekOrigin.Begin);
|
||||||
|
|
||||||
|
// compute hash to verify
|
||||||
|
var hashString = BitConverter.ToString(SHA1.HashData(decompressedData))
|
||||||
|
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
|
||||||
|
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
|
||||||
|
throw new InvalidOperationException($"{MareUser}|{hash}: Hash does not match file, computed: {hashString}, expected: {hash}");
|
||||||
|
|
||||||
|
// save file
|
||||||
|
var path = FilePathUtil.GetFilePath(_basePath, hash);
|
||||||
|
using var fileStream = new FileStream(path, FileMode.Create);
|
||||||
|
await compressedFileStream.CopyToAsync(fileStream).ConfigureAwait(false);
|
||||||
|
_logger.LogDebug("{user}|{file}: Uploaded file saved to {path}", MareUser, hash, path);
|
||||||
|
|
||||||
|
// update on db
|
||||||
|
await dbContext.Files.AddAsync(new FileCache()
|
||||||
|
{
|
||||||
|
Hash = hash,
|
||||||
|
UploadDate = DateTime.UtcNow,
|
||||||
|
UploaderUID = MareUser,
|
||||||
|
Size = compressedFileStream.Length,
|
||||||
|
Uploaded = true,
|
||||||
|
RawSize = decompressedData.LongLength
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await dbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||||
|
|
||||||
|
_logger.LogDebug("{user}|{file}: Uploaded file saved to DB", MareUser, hash);
|
||||||
|
|
||||||
|
bool isColdStorage = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
||||||
|
|
||||||
|
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalColdStorage : MetricsAPI.GaugeFilesTotal, 1);
|
||||||
|
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalSizeColdStorage : MetricsAPI.GaugeFilesTotalSize, compressedFileStream.Length);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private async Task<SemaphoreSlim> CreateFileLock(string hash, CancellationToken requestAborted)
|
||||||
|
{
|
||||||
|
SemaphoreSlim? fileLock = null;
|
||||||
|
bool successfullyWaited = false;
|
||||||
|
while (!successfullyWaited && !requestAborted.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
lock (_fileUploadLocks)
|
||||||
|
{
|
||||||
|
if (!_fileUploadLocks.TryGetValue(hash, out fileLock))
|
||||||
|
{
|
||||||
|
_logger.LogDebug("{user}|{file}: Creating filelock", MareUser, hash);
|
||||||
|
_fileUploadLocks[hash] = fileLock = new SemaphoreSlim(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_logger.LogDebug("{user}|{file}: Waiting for filelock", MareUser, hash);
|
||||||
|
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
|
||||||
|
successfullyWaited = true;
|
||||||
|
}
|
||||||
|
catch (ObjectDisposedException)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("{user}|{file}: Semaphore disposed, recreating", MareUser, hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fileLock;
|
||||||
|
}
|
||||||
|
|
||||||
private static void MungeBuffer(Span<byte> buffer)
|
private static void MungeBuffer(Span<byte> buffer)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < buffer.Length; ++i)
|
for (int i = 0; i < buffer.Length; ++i)
|
||||||
@@ -385,106 +360,4 @@ public class ServerFilesController : ControllerBase
|
|||||||
buffer[i] ^= 42;
|
buffer[i] ^= 42;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[HttpPost(MareFiles.ServerFiles_UploadRaw + "/{hash}")]
|
|
||||||
[RequestSizeLimit(200 * 1024 * 1024)]
|
|
||||||
public async Task<IActionResult> UploadFileRaw(string hash, CancellationToken requestAborted)
|
|
||||||
{
|
|
||||||
using var dbContext = await _mareDbContext.CreateDbContextAsync();
|
|
||||||
|
|
||||||
_logger.LogInformation("{user} uploading raw file {file}", MareUser, hash);
|
|
||||||
hash = hash.ToUpperInvariant();
|
|
||||||
var existingFile = await dbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
|
||||||
if (existingFile != null) return Ok();
|
|
||||||
|
|
||||||
SemaphoreSlim? fileLock = null;
|
|
||||||
bool successfullyWaited = false;
|
|
||||||
while (!successfullyWaited && !requestAborted.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
lock (_fileUploadLocks)
|
|
||||||
{
|
|
||||||
if (!_fileUploadLocks.TryGetValue(hash, out fileLock))
|
|
||||||
_fileUploadLocks[hash] = fileLock = new SemaphoreSlim(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
|
|
||||||
successfullyWaited = true;
|
|
||||||
}
|
|
||||||
catch (ObjectDisposedException)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("Semaphore disposed for {hash}, recreating", hash);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var existingFileCheck2 = await dbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
|
||||||
if (existingFileCheck2 != null)
|
|
||||||
{
|
|
||||||
return Ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
// copy the request body to memory
|
|
||||||
using var rawFileStream = new MemoryStream();
|
|
||||||
await Request.Body.CopyToAsync(rawFileStream, requestAborted).ConfigureAwait(false);
|
|
||||||
|
|
||||||
// reset streams
|
|
||||||
rawFileStream.Seek(0, SeekOrigin.Begin);
|
|
||||||
|
|
||||||
// compute hash to verify
|
|
||||||
var hashString = BitConverter.ToString(SHA1.HashData(rawFileStream.ToArray()))
|
|
||||||
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
|
|
||||||
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
|
|
||||||
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
|
|
||||||
|
|
||||||
// save file
|
|
||||||
var path = FilePathUtil.GetFilePath(_basePath, hash);
|
|
||||||
using var fileStream = new FileStream(path, FileMode.Create);
|
|
||||||
var lz4 = LZ4Wrapper.WrapHC(rawFileStream.ToArray(), 0, (int)rawFileStream.Length);
|
|
||||||
using var compressedStream = new MemoryStream(lz4);
|
|
||||||
await compressedStream.CopyToAsync(fileStream).ConfigureAwait(false);
|
|
||||||
|
|
||||||
// update on db
|
|
||||||
await dbContext.Files.AddAsync(new FileCache()
|
|
||||||
{
|
|
||||||
Hash = hash,
|
|
||||||
UploadDate = DateTime.UtcNow,
|
|
||||||
UploaderUID = MareUser,
|
|
||||||
Size = compressedStream.Length,
|
|
||||||
Uploaded = true,
|
|
||||||
RawSize = rawFileStream.Length
|
|
||||||
}).ConfigureAwait(false);
|
|
||||||
await dbContext.SaveChangesAsync().ConfigureAwait(false);
|
|
||||||
|
|
||||||
bool isColdStorage = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
|
||||||
|
|
||||||
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalColdStorage : MetricsAPI.GaugeFilesTotal, 1);
|
|
||||||
_metricsClient.IncGauge(isColdStorage ? MetricsAPI.GaugeFilesTotalSizeColdStorage : MetricsAPI.GaugeFilesTotalSize, rawFileStream.Length);
|
|
||||||
|
|
||||||
return Ok();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
_logger.LogError(e, "Error during file upload");
|
|
||||||
return BadRequest();
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
fileLock?.Release();
|
|
||||||
fileLock?.Dispose();
|
|
||||||
}
|
|
||||||
catch (ObjectDisposedException)
|
|
||||||
{
|
|
||||||
// it's disposed whatever
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_fileUploadLocks.TryRemove(hash, out _);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user