diff --git a/src/SyncFaction.Packer/Models/StreamWrapper.cs b/src/SyncFaction.Packer/Models/DisposableStreamWrapper.cs similarity index 82% rename from src/SyncFaction.Packer/Models/StreamWrapper.cs rename to src/SyncFaction.Packer/Models/DisposableStreamWrapper.cs index 5d40bad..cc7d6a6 100644 --- a/src/SyncFaction.Packer/Models/StreamWrapper.cs +++ b/src/SyncFaction.Packer/Models/DisposableStreamWrapper.cs @@ -2,7 +2,10 @@ namespace SyncFaction.Packer.Models; -public sealed class StreamWrapper : Stream +/// +/// Wrapper which can be disposed without disposing underlying stream +/// +public sealed class DisposableStreamWrapper : Stream { public override bool CanRead => stream.CanRead; @@ -21,7 +24,7 @@ public override long Position [SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Wrapper is not owner of the stream")] private readonly Stream stream; - public StreamWrapper(Stream stream) => this.stream = stream; + public DisposableStreamWrapper(Stream stream) => this.stream = stream; public override async ValueTask DisposeAsync() => await base.DisposeAsync(); diff --git a/src/SyncFaction.Packer/Models/RfgVpp.cs b/src/SyncFaction.Packer/Models/RfgVpp.cs index 03fa2af..7b1d991 100644 --- a/src/SyncFaction.Packer/Models/RfgVpp.cs +++ b/src/SyncFaction.Packer/Models/RfgVpp.cs @@ -97,7 +97,7 @@ public void ReadCompactedData(CancellationToken token) { token.ThrowIfCancellationRequested(); entryData.OverrideAlignmentSize(alignment); - entryData.OverrideData(new StreamView(new StreamView(ms, 0, decompressedLength), entryData.XDataOffset, entryData.XLenData)); + entryData.OverrideData(new StreamView(ms, entryData.XDataOffset, entryData.XLenData)); } } @@ -111,7 +111,6 @@ public void ReadCompressedData(CancellationToken token) { token.ThrowIfCancellationRequested(); - // TODO maybe get rid of some views here? var compressedLength = entryData.DataSize; // NOTE: important to calculate it before all overrides var totalCompressedLength = entryData.TotalSize; diff --git a/src/SyncFaction.Packer/Models/StreamView.cs b/src/SyncFaction.Packer/Models/StreamView.cs index abe0b4c..7aa1c3c 100644 --- a/src/SyncFaction.Packer/Models/StreamView.cs +++ b/src/SyncFaction.Packer/Models/StreamView.cs @@ -5,11 +5,14 @@ namespace SyncFaction.Packer.Models; +/// +/// Limited view of an underlying stream, maintaining its own position +/// public sealed class StreamView : Stream { - public override bool CanRead => stream.CanRead; + public override bool CanRead => Stream.CanRead; - public override bool CanSeek => false; + public override bool CanSeek => Stream.CanSeek; public override bool CanWrite => false; @@ -18,19 +21,51 @@ public sealed class StreamView : Stream public override long Position { get; set; } [SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "This class does not own stream")] - private readonly Stream stream; + private Stream Stream { get; } - private readonly long viewStart; + private long ViewStart { get; } public StreamView(Stream stream, long viewStart, long viewLength) { - this.stream = stream; + Stream = stream; Length = viewLength; - this.viewStart = viewStart; + ViewStart = viewStart; Position = 0; } - public override void Flush() => stream.Flush(); + public StreamView ThreadSafeCopy() + { + switch (Stream) + { + case FileStream fs: + var newFileStream = File.OpenRead(fs.Name); + newFileStream.Position = fs.Position; + var result1 = new StreamView(newFileStream, ViewStart, Length); + result1.Position = Position; + return result1; + case MemoryStream ms: + var newMemoryStream = new MemoryStream(ms.ToArray()); + newMemoryStream.Position = ms.Position; + var result2 = new StreamView(newMemoryStream, ViewStart, Length); + result2.Position = Position; + return result2; + case InflaterInputStream iis: + var inflated = new MemoryStream(); + iis.CopyTo(inflated); + var result3 = new StreamView(inflated, ViewStart, Length); + result3.Position = Position; + return result3; + case StreamView sv: + var innerCopy = sv.ThreadSafeCopy().Stream; + var result4 = new StreamView(innerCopy, ViewStart, Length); + result4.Position = Position; + return result4; + default: + throw new InvalidOperationException(); + } + } + + public override void Flush() => Stream.Flush(); public override int Read(byte[] buffer, int offset, int count) { @@ -44,15 +79,15 @@ public override int Read(byte[] buffer, int offset, int count) ? (int) (Position + count - Length) : 0; - if (stream.Position != viewStart + Position) + if (Stream.Position != ViewStart + Position) { - if (stream is not InflaterInputStream) + if (Stream is not InflaterInputStream) { - stream.Seek(viewStart + Position, SeekOrigin.Begin); + Stream.Seek(ViewStart + Position, SeekOrigin.Begin); } } - var result = stream.Read(buffer, offset, count - extraBytes); + var result = Stream.Read(buffer, offset, count - extraBytes); if (result > 0) { Position += result; @@ -63,17 +98,17 @@ public override int Read(byte[] buffer, int offset, int count) public override int ReadByte() { - if (Position + 1 >= viewStart + Length) + if (Position + 1 >= ViewStart + Length) { return -1; } - if (stream.Position != viewStart + Position) + if (Stream.Position != ViewStart + Position) { - stream.Seek(viewStart + Position, SeekOrigin.Begin); + Stream.Seek(ViewStart + Position, SeekOrigin.Begin); } - var result = stream.ReadByte(); + var result = Stream.ReadByte(); if (result > 0) { Position += result; @@ -87,12 +122,12 @@ public override long Seek(long offset, SeekOrigin origin) switch (origin) { case SeekOrigin.Begin: - if (offset < 0 || offset >= Length) + if (offset < 0 || offset > Length) { throw new InvalidOperationException($"Out of bounds: offset is {offset}, origin is {origin}, max length is {Length}"); } - if (stream is InflaterInputStream) + if (Stream is InflaterInputStream) { // hack to avoid seeking but still allow fast-forwarding var delta = (int) (offset - Position); @@ -104,29 +139,29 @@ public override long Seek(long offset, SeekOrigin origin) var pool = ArrayPool.Shared; var buf = pool.Rent(delta); Position = offset; - var read = stream.Read(buf, 0, delta); + var read = Stream.Read(buf, 0, delta); pool.Return(buf); return read; } Position = offset; - return stream.Seek(viewStart + offset, SeekOrigin.Begin); + return Stream.Seek(ViewStart + offset, SeekOrigin.Begin); case SeekOrigin.Current: - if (0 < Position + offset || Position + offset >= Length) + if (0 < Position + offset || Position + offset > Length) { throw new InvalidOperationException($"Out of bounds: offset is {offset}, position is {Position}, origin is {origin}, max length is {Length}"); } Position += offset; - return stream.Seek(offset, SeekOrigin.Current); + return Stream.Seek(offset, SeekOrigin.Current); case SeekOrigin.End: - if (offset < 0 || offset >= Length) + if (offset < 0 || offset > Length) { throw new InvalidOperationException($"Out of bounds: offset is {offset}, origin is {origin}, max length is {Length}"); } Position = Length - offset; - return stream.Seek(viewStart + Length - offset, SeekOrigin.Begin); + return Stream.Seek(ViewStart + Length - offset, SeekOrigin.Begin); default: throw new ArgumentOutOfRangeException(nameof(origin), origin, null); } @@ -134,14 +169,13 @@ public override long Seek(long offset, SeekOrigin origin) public override void SetLength(long value) => throw new InvalidOperationException($"{nameof(StreamView)} is read-only"); - //viewLength = value; public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException($"{nameof(StreamView)} is read-only"); public override string ToString() { - var length = stream is InflaterInputStream - ? "unsupported" - : stream.Length.ToString(CultureInfo.InvariantCulture); - return $"stream: len={length} pos={stream.Position}, view: start={viewStart}, len={Length}, pos={Position}"; + var length = Stream is InflaterInputStream + ? "unsupported (inflater stream)" + : Stream.Length.ToString(CultureInfo.InvariantCulture); + return $"StreamView: start={ViewStart}, len={Length}, pos={Position}, stream len={length}, stream pos={Stream.Position}"; } } diff --git a/src/SyncFaction.Packer/Services/VppWriter.cs b/src/SyncFaction.Packer/Services/VppWriter.cs index f3b7d3a..a81d617 100644 --- a/src/SyncFaction.Packer/Services/VppWriter.cs +++ b/src/SyncFaction.Packer/Services/VppWriter.cs @@ -228,7 +228,7 @@ await Utils.Write(ms, Func wrapperFactory = compressOutput switch { true => x => new DeflaterOutputStream(x, new Deflater(compressionLevel)) { IsStreamOwner = false }, - false => static x => new StreamWrapper(x) + false => static x => new DisposableStreamWrapper(x) }; uint uncompressedSize = 0; diff --git a/src/SyncFaction.Toolbox/Archiver.cs b/src/SyncFaction.Toolbox/Archiver.cs index d0c77d3..b8e7aba 100644 --- a/src/SyncFaction.Toolbox/Archiver.cs +++ b/src/SyncFaction.Toolbox/Archiver.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.FileSystemGlobbing; using Microsoft.Extensions.Logging; using Microsoft.IO; +using PleOps.XdeltaSharp.Vcdiff.Instructions; using SyncFaction.ModManager; using SyncFaction.ModManager.Services; using SyncFaction.Packer.Models; @@ -94,7 +95,7 @@ public async Task Unpack(UnpackSettings settings, CancellationToken token) var batch = unpackArgsQueue.Take(batchSize).ToList(); foreach (var x in batch.Where(x => !runningTasks.ContainsKey(x))) { - runningTasks.Add(x, UnpackArchive(x, cts.Token)); + runningTasks.Add(x, Unpack(x, cts.Token)); } var completed = await Task.WhenAny(runningTasks.Values); var result = await completed; @@ -127,27 +128,27 @@ public async Task Unpack(UnpackSettings settings, CancellationToken token) log.LogInformation("Completed in {time}", sw.Elapsed); } - private async Task UnpackArchive(UnpackArgs args, CancellationToken token) + private async Task Unpack(UnpackArgs args, CancellationToken token) { try { return args.Archive switch { - Stream stream => await UnpackArchiveInternal(args, stream, token), - PegStreams pegStreams => await UnpackTexturesInternal(args, pegStreams, token), + Stream stream => await UnpackVpp(args, stream, token), + PegStreams pegStreams => await UnpackPeg(args, pegStreams, token), _ => throw new ArgumentOutOfRangeException() }; } catch (Exception e) { - throw new Exception($"Failed {nameof(UnpackArchive)} {args}", e); + throw new Exception($"Failed {nameof(Unpack)} {args}", e); } } /// /// NOTE: archive stream is disposed here! any new streams are copies (MemStreams) /// - private async Task UnpackArchiveInternal(UnpackArgs args, Stream archiveStream, CancellationToken token) + private async Task UnpackVpp(UnpackArgs args, Stream archiveStream, CancellationToken token) { token.ThrowIfCancellationRequested(); var (_, name, output, matcher, settings, relativePath) = args; @@ -163,9 +164,6 @@ private async Task UnpackArchiveInternal(UnpackArgs args, Stream a outputDir.Refresh(); } - outputDir.Create(); - outputDir.Refresh(); - var hash = await Utils.ComputeHash(archiveStream); await using var src = archiveStream; var vpp = await vppArchiver.UnpackVpp(src, name, token); @@ -186,7 +184,9 @@ private async Task UnpackArchiveInternal(UnpackArgs args, Stream a } if (!outputFile.Directory!.Exists) { - throw new InvalidOperationException($"Directory [{outputFile.Directory.FullName}] doesnt exist, can not unpack. Race condition?"); + outputFile.Directory.Create(); + outputFile.Directory.Refresh(); + outputFile.Refresh(); } var extension = Path.GetExtension(logicalFile.Name).ToLowerInvariant()[1..]; // exclude "." @@ -195,10 +195,24 @@ private async Task UnpackArchiveInternal(UnpackArgs args, Stream a var isPeg = Constatns.KnownPegExtensions.Contains(extension); var isRegularFile = !isVpp && !isPeg; - var tag = Path.Combine(archiveRelativePath, logicalFile.Name); - var copyStream = streamManager.GetStream(tag, logicalFile.Content.Length); - await logicalFile.Content.CopyToAsync(copyStream, token); - copyStream.Seek(0, SeekOrigin.Begin); + /* + return task with logical file + extraction: + regular file: write if matched + archive: loop, write if not "-s", write if matched + + */ + + //var tag = Path.Combine(archiveRelativePath, logicalFile.Name); + //var copyStream = streamManager.GetStream(tag, logicalFile.Content.Length); + //await logicalFile.Content.CopyToAsync(copyStream, token); + //copyStream.Seek(0, SeekOrigin.Begin); + if (!logicalFile.Content.CanSeek) + { + logicalFile.Content.Seek(0, SeekOrigin.Begin); + throw new InvalidOperationException($"Not seekable: {logicalFile.Content}"); + } + var copyStream = CreateThreadSafeCopy(logicalFile.Content); bool canDispose = true; if (isRegularFile || !settings.SkipArchives) @@ -246,6 +260,16 @@ private async Task UnpackArchiveInternal(UnpackArgs args, Stream a return new UnpackResult(archiveRelativePath, archiveMetadata, args, result); } + private Stream CreateThreadSafeCopy(Stream stream) + { + if (stream is not StreamView view) + { + throw new ArgumentException($"Only StreamView is supported, got {stream}"); + } + + view.Stream + } + public static PegStreams? FindPegEntryPair(string name, IReadOnlyDictionary cache) { var cpu = PegFiles.GetCpuFileName(name); @@ -260,7 +284,7 @@ private async Task UnpackArchiveInternal(UnpackArgs args, Stream a return new PegStreams(cpuStream, gpuStream); } - private async Task UnpackTexturesInternal(UnpackArgs args, PegStreams pegStreams, CancellationToken token) + private async Task UnpackPeg(UnpackArgs args, PegStreams pegStreams, CancellationToken token) { token.ThrowIfCancellationRequested(); var (_, name, output, matcher, settings, relativePath) = args; @@ -305,7 +329,7 @@ private async Task UnpackTexturesInternal(UnpackArgs args, PegStre return new UnpackResult(archiveRelativePath, archiveMetadata, args, result); } - private async Task ExtractFile(MemoryStream content, bool isXml, FileInfo outputFile, UnpackSettings settings, CancellationToken token) + private async Task ExtractFile(Stream content, bool isXml, FileInfo outputFile, UnpackSettings settings, CancellationToken token) { await using var fileStream = outputFile.OpenWrite(); if (settings.XmlFormat && isXml) diff --git a/src/tests/SyncFactionTests/VppRam/VppInMemoryWriter.cs b/src/tests/SyncFactionTests/VppRam/VppInMemoryWriter.cs index de32b72..1d88183 100644 --- a/src/tests/SyncFactionTests/VppRam/VppInMemoryWriter.cs +++ b/src/tests/SyncFactionTests/VppRam/VppInMemoryWriter.cs @@ -248,7 +248,7 @@ await Write(ms, Func wrapperFactory = compressOutput switch { true => x => new DeflaterOutputStream(x, new Deflater(compressionLevel)) { IsStreamOwner = false }, - false => x => new StreamWrapper(x) + false => x => new DisposableStreamWrapper(x) }; uint uncompressedSize = 0;