extract partial downloads

This commit is contained in:
Stanley Dimant
2025-02-21 21:29:54 +01:00
parent f3e9f90f9d
commit d94cb6f2f4
3 changed files with 60 additions and 35 deletions

View File

@@ -57,7 +57,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
}
}
public void CancelDownload()
public void ClearDownload()
{
CurrentDownloads.Clear();
_downloadStatus.Clear();
@@ -72,7 +72,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
}
catch
{
CancelDownload();
ClearDownload();
}
finally
{
@@ -83,8 +83,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
protected override void Dispose(bool disposing)
{
CancelDownload();
foreach (var stream in _activeDownloadStreams)
ClearDownload();
foreach (var stream in _activeDownloadStreams.ToList())
{
try
{
@@ -119,7 +119,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
bool readHash = false;
while (true)
{
var readChar = (char)MungeByte(fileBlockStream.ReadByte());
int readByte = fileBlockStream.ReadByte();
if (readByte == -1)
throw new EndOfStreamException();
var readChar = (char)MungeByte(readByte);
if (readChar == ':')
{
readHash = true;
@@ -169,7 +173,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
var bytesRead = 0;
var limit = _orchestrator.DownloadLimitPerSlot();
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit}", requestId, limit);
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath);
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
_activeDownloadStreams.Add(stream);
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
@@ -186,9 +190,12 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
}
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Error during file download of {requestUrl}", requestUrl);
try
{
if (!tempPath.IsNullOrEmpty())
@@ -271,6 +278,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
FileInfo fi = new(blockFile);
try
{
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
@@ -292,26 +300,25 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
}
catch (OperationCanceledException)
{
_orchestrator.ReleaseDownloadSlot();
File.Delete(blockFile);
Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler);
CancelDownload();
return;
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler);
}
catch (Exception ex)
{
_orchestrator.ReleaseDownloadSlot();
File.Delete(blockFile);
Logger.LogError(ex, "Error during download of {id}", requestId);
CancelDownload();
Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId);
ClearDownload();
return;
}
FileStream? fileBlockStream = null;
try
{
_downloadStatus[fileGroup.Key].TransferredFiles = 1;
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.Decompressing;
if (_downloadStatus.TryGetValue(fileGroup.Key, out var status))
{
status.TransferredFiles = 1;
status.DownloadStatus = DownloadStatus.Decompressing;
}
fileBlockStream = File.OpenRead(blockFile);
while (fileBlockStream.Position < fileBlockStream.Length)
{
@@ -319,28 +326,40 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
try
{
Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes);
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath);
byte[] compressedFileContent = new byte[fileLengthBytes];
_ = await fileBlockStream.ReadAsync(compressedFileContent, token).ConfigureAwait(false);
var readBytes = await fileBlockStream.ReadAsync(compressedFileContent, CancellationToken.None).ConfigureAwait(false);
if (readBytes != fileLengthBytes)
{
throw new EndOfStreamException();
}
MungeBuffer(compressedFileContent);
var decompressedFile = LZ4Wrapper.Unwrap(compressedFileContent);
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, token).ConfigureAwait(false);
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, CancellationToken.None).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath);
}
catch (EndOfStreamException)
{
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash);
}
catch (Exception e)
{
Logger.LogWarning(e, "Error during decompression");
Logger.LogWarning(e, "{dlName}: Error during decompression", fi.Name);
}
}
}
catch (EndOfStreamException)
{
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name);
}
catch (Exception ex)
{
Logger.LogError(ex, "Error during block file read");
Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name);
}
finally
{
@@ -353,7 +372,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("Download end: {id}", gameObjectHandler);
CancelDownload();
ClearDownload();
}
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)