custom file stream implementation

This commit is contained in:
rootdarkarchon
2023-08-26 00:36:30 +02:00
parent a3b33c0e6b
commit 56b27e5ee8
5 changed files with 153 additions and 17 deletions

View File

@@ -4,12 +4,12 @@
}, },
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Debug", "Default": "Warning",
"Microsoft": "Debug", "Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information", "Microsoft.Hosting.Lifetime": "Information",
"MareSynchronosStaticFilesServer": "Debug", "MareSynchronosStaticFilesServer": "Debug",
"MareSynchronosShared": "Debug", "MareSynchronosShared": "Debug",
"System.IO": "Information" "System.IO": "Information",
}, },
"File": { "File": {
"BasePath": "logs", "BasePath": "logs",

View File

@@ -2,8 +2,6 @@
using MareSynchronosStaticFilesServer.Services; using MareSynchronosStaticFilesServer.Services;
using MareSynchronosStaticFilesServer.Utils; using MareSynchronosStaticFilesServer.Utils;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using System.Globalization;
using System.Text;
namespace MareSynchronosStaticFilesServer.Controllers; namespace MareSynchronosStaticFilesServer.Controllers;
@@ -34,27 +32,22 @@ public class CacheController : ControllerBase
_requestQueue.ActivateRequest(requestId); _requestQueue.ActivateRequest(requestId);
Response.ContentType = "application/octet-stream"; Response.ContentType = "application/octet-stream";
var memoryStream = new MemoryStream();
var streamWriter = new BinaryWriter(memoryStream);
long requestSize = 0; long requestSize = 0;
List<BlockFileDataSubstream> substreams = new();
foreach (var file in request.FileIds) foreach (var file in request.FileIds)
{ {
var fs = await _cachedFileProvider.GetAndDownloadFileStream(file); var fs = await _cachedFileProvider.GetAndDownloadFileStream(file);
if (fs == null) continue; if (fs == null) continue;
streamWriter.Write(Encoding.ASCII.GetBytes("#" + file + ":" + fs.Length.ToString(CultureInfo.InvariantCulture) + "#"));
byte[] buffer = new byte[fs.Length]; substreams.Add(new(fs));
_ = await fs.ReadAsync(buffer, HttpContext.RequestAborted);
streamWriter.Write(buffer);
requestSize += fs.Length; requestSize += fs.Length;
} }
streamWriter.Flush();
memoryStream.Position = 0;
_fileStatisticsService.LogRequest(requestSize); _fileStatisticsService.LogRequest(requestSize);
return _requestFileStreamResultFactory.Create(requestId, memoryStream); return _requestFileStreamResultFactory.Create(requestId, new BlockFileDataStream(substreams.ToArray()));
} }
} }

View File

@@ -0,0 +1,70 @@
namespace MareSynchronosStaticFilesServer.Utils;
public sealed class BlockFileDataStream : Stream
{
private readonly BlockFileDataSubstream[] _substreams;
private int _currentStreamIndex = 0;
public BlockFileDataStream(BlockFileDataSubstream[] substreams)
{
_substreams = substreams;
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override int Read(byte[] buffer, int offset, int count)
{
int totalRead = 0;
int currentOffset = 0;
int remainingCount = count;
while (totalRead < count && _currentStreamIndex < _substreams.Length)
{
var lastReadBytes = _substreams[_currentStreamIndex].Read(buffer, currentOffset, remainingCount);
if (lastReadBytes < remainingCount)
{
_substreams[_currentStreamIndex].Dispose();
_currentStreamIndex++;
}
totalRead += lastReadBytes;
currentOffset += lastReadBytes;
remainingCount -= lastReadBytes;
}
return totalRead;
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
foreach (var substream in _substreams)
{
// probably unnecessary but better safe than sorry
substream.Dispose();
}
}
public override void Flush()
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
}

View File

@@ -0,0 +1,73 @@
using System.Globalization;
using System.Text;
namespace MareSynchronosStaticFilesServer.Utils;
public sealed class BlockFileDataSubstream : IDisposable
{
private readonly MemoryStream _headerStream;
private readonly FileStream _dataStream;
private int _headerPosition = 0;
private long _dataPosition = 0;
private bool _disposed = false;
private long RemainingHeaderLength => _headerStream.Length - _headerPosition;
private long RemainingDataLength => _dataStream.Length - _dataPosition;
public BlockFileDataSubstream(FileStream dataStream)
{
_headerStream = new MemoryStream();
var headerStreamWriter = new BinaryWriter(_headerStream);
headerStreamWriter.Write(Encoding.ASCII.GetBytes("#" + new FileInfo(dataStream.Name).Name + ":" + dataStream.Length.ToString(CultureInfo.InvariantCulture) + "#"));
headerStreamWriter.Flush();
_headerStream.Position = 0;
_dataStream = dataStream;
}
public int Read(byte[] inputBuffer, int offset, int count)
{
int currentOffset = offset;
int currentCount = count;
int readHeaderBytes = 0;
if (RemainingHeaderLength > 0)
{
bool readOnlyHeader = currentCount <= RemainingHeaderLength;
byte[] readHeaderBuffer = new byte[Math.Min(currentCount, RemainingHeaderLength)];
readHeaderBytes = _headerStream.Read(readHeaderBuffer, 0, readHeaderBuffer.Length);
_headerPosition += readHeaderBytes;
Buffer.BlockCopy(readHeaderBuffer, 0, inputBuffer, currentOffset, readHeaderBytes);
if (readOnlyHeader)
{
return readHeaderBytes;
}
currentOffset += readHeaderBytes;
currentCount -= readHeaderBytes;
}
if (RemainingDataLength > 0)
{
byte[] readDataBuffer = new byte[currentCount];
var readDataBytes = _dataStream.Read(readDataBuffer, 0, readDataBuffer.Length);
_dataPosition += readDataBytes;
Buffer.BlockCopy(readDataBuffer, 0, inputBuffer, currentOffset, readDataBytes);
return readDataBytes + readHeaderBytes;
}
return 0;
}
public void Dispose()
{
if (_disposed) return;
_headerStream.Dispose();
_dataStream.Dispose();
_disposed = true;
}
}

View File

@@ -17,9 +17,9 @@ public class RequestFileStreamResultFactory
_configurationService = configurationService; _configurationService = configurationService;
} }
public RequestFileStreamResult Create(Guid requestId, MemoryStream ms) public RequestFileStreamResult Create(Guid requestId, Stream stream)
{ {
return new RequestFileStreamResult(requestId, _requestQueueService, return new RequestFileStreamResult(requestId, _requestQueueService,
_metrics, ms, "application/octet-stream"); _metrics, stream, "application/octet-stream");
} }
} }