From c80848fa2e81926fc7e8f6aae9c17e2d1326c693 Mon Sep 17 00:00:00 2001 From: Stanley Dimant Date: Mon, 4 Nov 2024 11:51:13 +0100 Subject: [PATCH] reimplment ReadAsync for BlockFileDataStream/SubStream --- .../Controllers/CacheController.cs | 4 +- .../Controllers/DistributionController.cs | 8 +- .../Services/CachedFileProvider.cs | 8 +- .../Utils/BlockFileDataStream.cs | 80 +++++++++---- .../Utils/BlockFileDataSubstream.cs | 105 ++++++++++-------- 5 files changed, 128 insertions(+), 77 deletions(-) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/CacheController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/CacheController.cs index 106a83b..4e0b4c7 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/CacheController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/CacheController.cs @@ -36,9 +36,9 @@ public class CacheController : ControllerBase long requestSize = 0; List substreams = new(); - foreach (var file in request.FileIds) + foreach (var fileHash in request.FileIds) { - var fs = await _cachedFileProvider.GetAndDownloadFileStream(file); + var fs = await _cachedFileProvider.DownloadAndGetLocalFileInfo(fileHash).ConfigureAwait(false); if (fs == null) continue; substreams.Add(new(fs)); diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/DistributionController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/DistributionController.cs index 6df7cc3..692f48e 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/DistributionController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/DistributionController.cs @@ -17,13 +17,13 @@ public class DistributionController : ControllerBase [HttpGet(MareFiles.Distribution_Get)] [Authorize(Policy = "Internal")] - public async Task GetFile(string file) + public async Task GetFile(string fileHash) { - _logger.LogInformation($"GetFile:{MareUser}:{file}"); + _logger.LogInformation($"GetFile:{MareUser}:{fileHash}"); - var fs = await _cachedFileProvider.GetAndDownloadFileStream(file); + var fs = await _cachedFileProvider.DownloadAndGetLocalFileInfo(fileHash); if (fs == null) return NotFound(); - return File(fs, "application/octet-stream"); + return PhysicalFile(fs.FullName, "application/octet-stream"); } } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs index f687dcd..d5d9242 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs @@ -167,7 +167,7 @@ public sealed class CachedFileProvider : IDisposable _downloadSemaphore.Release(); } - public FileStream? GetLocalFileStream(string hash) + public FileInfo? GetLocalFilePath(string hash) { var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); if (fi == null) return null; @@ -176,10 +176,10 @@ public sealed class CachedFileProvider : IDisposable _fileStatisticsService.LogFile(hash, fi.Length); - return new FileStream(fi.FullName, FileMode.Open, FileAccess.Read, FileShare.Inheritable | FileShare.Read); + return new FileInfo(fi.FullName); } - public async Task GetAndDownloadFileStream(string hash) + public async Task DownloadAndGetLocalFileInfo(string hash) { await DownloadFileWhenRequired(hash).ConfigureAwait(false); @@ -203,7 +203,7 @@ public sealed class CachedFileProvider : IDisposable } } - return GetLocalFileStream(hash); + return GetLocalFilePath(hash); } public bool AnyFilesDownloading(List hashes) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataStream.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataStream.cs index cefb47e..ade2228 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataStream.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataStream.cs @@ -1,11 +1,12 @@ -namespace MareSynchronosStaticFilesServer.Utils; + +namespace MareSynchronosStaticFilesServer.Utils; public sealed class BlockFileDataStream : Stream { - private readonly List _substreams; + private readonly IEnumerable _substreams; private int _currentStreamIndex = 0; - public BlockFileDataStream(List substreams) + public BlockFileDataStream(IEnumerable substreams) { _substreams = substreams; } @@ -16,17 +17,17 @@ public sealed class BlockFileDataStream : Stream public override long Length => throw new NotSupportedException(); public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } - public override int Read(byte[] buffer, int offset, int count) + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int totalRead = 0; int currentOffset = 0; int remainingCount = count; - while (totalRead < count && _currentStreamIndex < _substreams.Count) + while (totalRead < count && _currentStreamIndex < _substreams.Count()) { - var lastReadBytes = _substreams[_currentStreamIndex].Read(buffer, currentOffset, remainingCount); + var lastReadBytes = await _substreams.ElementAt(_currentStreamIndex).ReadAsync(buffer, currentOffset, remainingCount, cancellationToken).ConfigureAwait(false); if (lastReadBytes < remainingCount) { - _substreams[_currentStreamIndex].Dispose(); + _substreams.ElementAt(_currentStreamIndex).Dispose(); _currentStreamIndex++; } @@ -38,6 +39,51 @@ public sealed class BlockFileDataStream : Stream return totalRead; } + public override int Read(byte[] buffer, int offset, int count) + { + int totalRead = 0; + int currentOffset = 0; + int remainingCount = count; + while (totalRead < count && _currentStreamIndex < _substreams.Count()) + { + var lastReadBytes = _substreams.ElementAt(_currentStreamIndex).Read(buffer, currentOffset, remainingCount); + if (lastReadBytes < remainingCount) + { + _substreams.ElementAt(_currentStreamIndex).Dispose(); + _currentStreamIndex++; + } + + totalRead += lastReadBytes; + currentOffset += lastReadBytes; + remainingCount -= lastReadBytes; + } + + return totalRead; + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + int totalRead = 0; + + while (totalRead < buffer.Length && _currentStreamIndex < _substreams.Count()) + { + var substream = _substreams.ElementAt(_currentStreamIndex); + int bytesRead = await substream.ReadAsync(buffer.Slice(totalRead), cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + substream.Dispose(); + _currentStreamIndex++; + } + else + { + totalRead += bytesRead; + } + } + + return totalRead; + } + protected override void Dispose(bool disposing) { if (disposing) @@ -52,23 +98,11 @@ public sealed class BlockFileDataStream : Stream base.Dispose(disposing); } - public override void Flush() - { - throw new NotSupportedException(); - } + public override void Flush() => throw new NotSupportedException(); - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); - public override void SetLength(long value) - { - throw new NotSupportedException(); - } + public override void SetLength(long value) => throw new NotSupportedException(); - public override void Write(byte[] buffer, int offset, int count) - { - throw new NotSupportedException(); - } + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataSubstream.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataSubstream.cs index 8d7a7bf..6f0169a 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataSubstream.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/BlockFileDataSubstream.cs @@ -6,70 +6,87 @@ namespace MareSynchronosStaticFilesServer.Utils; public sealed class BlockFileDataSubstream : IDisposable { private readonly MemoryStream _headerStream; - private readonly BinaryWriter _headerStreamWriter; - private readonly FileStream _dataStream; - private int _headerPosition = 0; - private long _dataPosition = 0; private bool _disposed = false; + private readonly Lazy _dataStreamLazy; + private FileStream DataStream => _dataStreamLazy.Value; - private long RemainingHeaderLength => _headerStream.Length - _headerPosition; - private long RemainingDataLength => _dataStream.Length - _dataPosition; - - public BlockFileDataSubstream(FileStream dataStream) + public BlockFileDataSubstream(FileInfo file) { - _headerStream = new MemoryStream(); - _headerStreamWriter = new BinaryWriter(_headerStream); - _headerStreamWriter.Write(Encoding.ASCII.GetBytes("#" + new FileInfo(dataStream.Name).Name + ":" + dataStream.Length.ToString(CultureInfo.InvariantCulture) + "#")); - _headerStreamWriter.Flush(); - _headerStream.Position = 0; - _dataStream = dataStream; + _dataStreamLazy = new(() => File.Open(file.FullName, FileMode.Open, FileAccess.Read, FileShare.Read | FileShare.Inheritable)); + _headerStream = new MemoryStream(Encoding.ASCII.GetBytes("#" + file.Name + ":" + file.Length.ToString(CultureInfo.InvariantCulture) + "#")); } public int Read(byte[] inputBuffer, int offset, int count) { - int currentOffset = offset; - int currentCount = count; - int readHeaderBytes = 0; + int bytesRead = 0; - if (RemainingHeaderLength > 0) + // Read from header stream if it has remaining data + if (_headerStream.Position < _headerStream.Length) { - bool readOnlyHeader = currentCount <= RemainingHeaderLength; - byte[] readHeaderBuffer = new byte[Math.Min(currentCount, RemainingHeaderLength)]; - - readHeaderBytes = _headerStream.Read(readHeaderBuffer, 0, readHeaderBuffer.Length); - _headerPosition += readHeaderBytes; - - Buffer.BlockCopy(readHeaderBuffer, 0, inputBuffer, currentOffset, readHeaderBytes); - - if (readOnlyHeader) - { - return readHeaderBytes; - } - - currentOffset += readHeaderBytes; - currentCount -= readHeaderBytes; + int headerBytesToRead = (int)Math.Min(count, _headerStream.Length - _headerStream.Position); + bytesRead += _headerStream.Read(inputBuffer, offset, headerBytesToRead); + offset += bytesRead; + count -= bytesRead; } - if (RemainingDataLength > 0) + // Read from data stream if there is still space in buffer + if (count > 0 && DataStream.Position < DataStream.Length) { - byte[] readDataBuffer = new byte[currentCount]; - var readDataBytes = _dataStream.Read(readDataBuffer, 0, readDataBuffer.Length); - _dataPosition += readDataBytes; - - Buffer.BlockCopy(readDataBuffer, 0, inputBuffer, currentOffset, readDataBytes); - - return readDataBytes + readHeaderBytes; + bytesRead += DataStream.Read(inputBuffer, offset, count); } - return 0; + return bytesRead; + } + + public async Task ReadAsync(byte[] inputBuffer, int offset, int count, CancellationToken cancellationToken = default) + { + int bytesRead = 0; + + // Async read from header stream + if (_headerStream.Position < _headerStream.Length) + { + int headerBytesToRead = (int)Math.Min(count, _headerStream.Length - _headerStream.Position); + bytesRead += await _headerStream.ReadAsync(inputBuffer.AsMemory(offset, headerBytesToRead), cancellationToken).ConfigureAwait(false); + offset += bytesRead; + count -= bytesRead; + } + + // Async read from data stream + if (count > 0 && DataStream.Position < DataStream.Length) + { + bytesRead += await DataStream.ReadAsync(inputBuffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); + } + + return bytesRead; + } + + public async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + int bytesRead = 0; + + // Async read from header stream + if (_headerStream.Position < _headerStream.Length) + { + int headerBytesToRead = (int)Math.Min(buffer.Length, _headerStream.Length - _headerStream.Position); + bytesRead += await _headerStream.ReadAsync(buffer.Slice(0, headerBytesToRead), cancellationToken).ConfigureAwait(false); + buffer = buffer.Slice(headerBytesToRead); + } + + // Async read from data stream + if (buffer.Length > 0 && DataStream.Position < DataStream.Length) + { + bytesRead += await DataStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + + return bytesRead; } public void Dispose() { if (_disposed) return; _headerStream.Dispose(); - _headerStreamWriter.Dispose(); - _dataStream.Dispose(); + if (_dataStreamLazy.IsValueCreated) + DataStream.Dispose(); _disposed = true; } } \ No newline at end of file