Skip to content

Commit

Permalink
WIP: archiver support for peg files
Browse files Browse the repository at this point in the history
  • Loading branch information
Rast1234 committed Jul 14, 2024
1 parent ea79650 commit 5841155
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

namespace SyncFaction.Packer.Models;

public sealed class StreamWrapper : Stream
/// <summary>
/// Wrapper which can be disposed without disposing underlying stream
/// </summary>
public sealed class DisposableStreamWrapper : Stream
{
public override bool CanRead => stream.CanRead;

Expand All @@ -21,7 +24,7 @@ public override long Position
[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Wrapper is not owner of the stream")]
private readonly Stream stream;

public StreamWrapper(Stream stream) => this.stream = stream;
public DisposableStreamWrapper(Stream stream) => this.stream = stream;

public override async ValueTask DisposeAsync() => await base.DisposeAsync();

Expand Down
3 changes: 1 addition & 2 deletions src/SyncFaction.Packer/Models/RfgVpp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void ReadCompactedData(CancellationToken token)
{
token.ThrowIfCancellationRequested();
entryData.OverrideAlignmentSize(alignment);
entryData.OverrideData(new StreamView(new StreamView(ms, 0, decompressedLength), entryData.XDataOffset, entryData.XLenData));
entryData.OverrideData(new StreamView(ms, entryData.XDataOffset, entryData.XLenData));
}
}

Expand All @@ -111,7 +111,6 @@ public void ReadCompressedData(CancellationToken token)
{
token.ThrowIfCancellationRequested();

// TODO maybe get rid of some views here?
var compressedLength = entryData.DataSize;
// NOTE: important to calculate it before all overrides
var totalCompressedLength = entryData.TotalSize;
Expand Down
90 changes: 62 additions & 28 deletions src/SyncFaction.Packer/Models/StreamView.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

namespace SyncFaction.Packer.Models;

/// <summary>
/// Limited view of an underlying stream, maintaining its own position
/// </summary>
public sealed class StreamView : Stream
{
public override bool CanRead => stream.CanRead;
public override bool CanRead => Stream.CanRead;

public override bool CanSeek => false;
public override bool CanSeek => Stream.CanSeek;

public override bool CanWrite => false;

Expand All @@ -18,19 +21,51 @@ public sealed class StreamView : Stream
public override long Position { get; set; }

[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "This class does not own stream")]
private readonly Stream stream;
private Stream Stream { get; }

private readonly long viewStart;
private long ViewStart { get; }

public StreamView(Stream stream, long viewStart, long viewLength)
{
this.stream = stream;
Stream = stream;
Length = viewLength;
this.viewStart = viewStart;
ViewStart = viewStart;
Position = 0;
}

public override void Flush() => stream.Flush();
public StreamView ThreadSafeCopy()
{
switch (Stream)
{
case FileStream fs:
var newFileStream = File.OpenRead(fs.Name);
newFileStream.Position = fs.Position;
var result1 = new StreamView(newFileStream, ViewStart, Length);
result1.Position = Position;
return result1;
case MemoryStream ms:
var newMemoryStream = new MemoryStream(ms.ToArray());
newMemoryStream.Position = ms.Position;
var result2 = new StreamView(newMemoryStream, ViewStart, Length);
result2.Position = Position;
return result2;
case InflaterInputStream iis:
var inflated = new MemoryStream();
iis.CopyTo(inflated);
var result3 = new StreamView(inflated, ViewStart, Length);
result3.Position = Position;
return result3;
case StreamView sv:
var innerCopy = sv.ThreadSafeCopy().Stream;
var result4 = new StreamView(innerCopy, ViewStart, Length);
result4.Position = Position;
return result4;
default:
throw new InvalidOperationException();
}
}

public override void Flush() => Stream.Flush();

public override int Read(byte[] buffer, int offset, int count)
{
Expand All @@ -44,15 +79,15 @@ public override int Read(byte[] buffer, int offset, int count)
? (int) (Position + count - Length)
: 0;

if (stream.Position != viewStart + Position)
if (Stream.Position != ViewStart + Position)
{
if (stream is not InflaterInputStream)
if (Stream is not InflaterInputStream)
{
stream.Seek(viewStart + Position, SeekOrigin.Begin);
Stream.Seek(ViewStart + Position, SeekOrigin.Begin);
}
}

var result = stream.Read(buffer, offset, count - extraBytes);
var result = Stream.Read(buffer, offset, count - extraBytes);
if (result > 0)
{
Position += result;
Expand All @@ -63,17 +98,17 @@ public override int Read(byte[] buffer, int offset, int count)

public override int ReadByte()
{
if (Position + 1 >= viewStart + Length)
if (Position + 1 >= ViewStart + Length)
{
return -1;
}

if (stream.Position != viewStart + Position)
if (Stream.Position != ViewStart + Position)
{
stream.Seek(viewStart + Position, SeekOrigin.Begin);
Stream.Seek(ViewStart + Position, SeekOrigin.Begin);
}

var result = stream.ReadByte();
var result = Stream.ReadByte();
if (result > 0)
{
Position += result;
Expand All @@ -87,12 +122,12 @@ public override long Seek(long offset, SeekOrigin origin)
switch (origin)
{
case SeekOrigin.Begin:
if (offset < 0 || offset >= Length)
if (offset < 0 || offset > Length)
{
throw new InvalidOperationException($"Out of bounds: offset is {offset}, origin is {origin}, max length is {Length}");
}

if (stream is InflaterInputStream)
if (Stream is InflaterInputStream)
{
// hack to avoid seeking but still allow fast-forwarding
var delta = (int) (offset - Position);
Expand All @@ -104,44 +139,43 @@ public override long Seek(long offset, SeekOrigin origin)
var pool = ArrayPool<byte>.Shared;
var buf = pool.Rent(delta);
Position = offset;
var read = stream.Read(buf, 0, delta);
var read = Stream.Read(buf, 0, delta);
pool.Return(buf);
return read;
}

Position = offset;
return stream.Seek(viewStart + offset, SeekOrigin.Begin);
return Stream.Seek(ViewStart + offset, SeekOrigin.Begin);
case SeekOrigin.Current:
if (0 < Position + offset || Position + offset >= Length)
if (0 < Position + offset || Position + offset > Length)
{
throw new InvalidOperationException($"Out of bounds: offset is {offset}, position is {Position}, origin is {origin}, max length is {Length}");
}

Position += offset;
return stream.Seek(offset, SeekOrigin.Current);
return Stream.Seek(offset, SeekOrigin.Current);
case SeekOrigin.End:
if (offset < 0 || offset >= Length)
if (offset < 0 || offset > Length)
{
throw new InvalidOperationException($"Out of bounds: offset is {offset}, origin is {origin}, max length is {Length}");
}

Position = Length - offset;
return stream.Seek(viewStart + Length - offset, SeekOrigin.Begin);
return Stream.Seek(ViewStart + Length - offset, SeekOrigin.Begin);
default:
throw new ArgumentOutOfRangeException(nameof(origin), origin, null);
}
}

public override void SetLength(long value) => throw new InvalidOperationException($"{nameof(StreamView)} is read-only");

//viewLength = value;
public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException($"{nameof(StreamView)} is read-only");

public override string ToString()
{
var length = stream is InflaterInputStream
? "unsupported"
: stream.Length.ToString(CultureInfo.InvariantCulture);
return $"stream: len={length} pos={stream.Position}, view: start={viewStart}, len={Length}, pos={Position}";
var length = Stream is InflaterInputStream
? "unsupported (inflater stream)"
: Stream.Length.ToString(CultureInfo.InvariantCulture);
return $"StreamView: start={ViewStart}, len={Length}, pos={Position}, stream len={length}, stream pos={Stream.Position}";
}
}
2 changes: 1 addition & 1 deletion src/SyncFaction.Packer/Services/VppWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ await Utils.Write(ms,
Func<Stream, Stream> wrapperFactory = compressOutput switch
{
true => x => new DeflaterOutputStream(x, new Deflater(compressionLevel)) { IsStreamOwner = false },
false => static x => new StreamWrapper(x)
false => static x => new DisposableStreamWrapper(x)
};

uint uncompressedSize = 0;
Expand Down
56 changes: 40 additions & 16 deletions src/SyncFaction.Toolbox/Archiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Extensions.FileSystemGlobbing;
using Microsoft.Extensions.Logging;
using Microsoft.IO;
using PleOps.XdeltaSharp.Vcdiff.Instructions;
using SyncFaction.ModManager;
using SyncFaction.ModManager.Services;
using SyncFaction.Packer.Models;
Expand Down Expand Up @@ -94,7 +95,7 @@ public async Task Unpack(UnpackSettings settings, CancellationToken token)
var batch = unpackArgsQueue.Take(batchSize).ToList();
foreach (var x in batch.Where(x => !runningTasks.ContainsKey(x)))
{
runningTasks.Add(x, UnpackArchive(x, cts.Token));
runningTasks.Add(x, Unpack(x, cts.Token));
}
var completed = await Task.WhenAny(runningTasks.Values);
var result = await completed;
Expand Down Expand Up @@ -127,27 +128,27 @@ public async Task Unpack(UnpackSettings settings, CancellationToken token)
log.LogInformation("Completed in {time}", sw.Elapsed);
}

private async Task<UnpackResult> UnpackArchive(UnpackArgs args, CancellationToken token)
private async Task<UnpackResult> Unpack(UnpackArgs args, CancellationToken token)
{
try
{
return args.Archive switch
{
Stream stream => await UnpackArchiveInternal(args, stream, token),
PegStreams pegStreams => await UnpackTexturesInternal(args, pegStreams, token),
Stream stream => await UnpackVpp(args, stream, token),
PegStreams pegStreams => await UnpackPeg(args, pegStreams, token),
_ => throw new ArgumentOutOfRangeException()
};
}
catch (Exception e)
{
throw new Exception($"Failed {nameof(UnpackArchive)} {args}", e);
throw new Exception($"Failed {nameof(Unpack)} {args}", e);
}
}

/// <summary>
/// NOTE: archive stream is disposed here! any new streams are copies (MemStreams)
/// </summary>
private async Task<UnpackResult> UnpackArchiveInternal(UnpackArgs args, Stream archiveStream, CancellationToken token)
private async Task<UnpackResult> UnpackVpp(UnpackArgs args, Stream archiveStream, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var (_, name, output, matcher, settings, relativePath) = args;
Expand All @@ -163,9 +164,6 @@ private async Task<UnpackResult> UnpackArchiveInternal(UnpackArgs args, Stream a
outputDir.Refresh();
}

outputDir.Create();
outputDir.Refresh();

var hash = await Utils.ComputeHash(archiveStream);
await using var src = archiveStream;
var vpp = await vppArchiver.UnpackVpp(src, name, token);
Expand All @@ -186,7 +184,9 @@ private async Task<UnpackResult> UnpackArchiveInternal(UnpackArgs args, Stream a
}
if (!outputFile.Directory!.Exists)
{
throw new InvalidOperationException($"Directory [{outputFile.Directory.FullName}] doesnt exist, can not unpack. Race condition?");
outputFile.Directory.Create();
outputFile.Directory.Refresh();
outputFile.Refresh();
}

var extension = Path.GetExtension(logicalFile.Name).ToLowerInvariant()[1..]; // exclude "."
Expand All @@ -195,10 +195,24 @@ private async Task<UnpackResult> UnpackArchiveInternal(UnpackArgs args, Stream a
var isPeg = Constatns.KnownPegExtensions.Contains(extension);
var isRegularFile = !isVpp && !isPeg;

var tag = Path.Combine(archiveRelativePath, logicalFile.Name);
var copyStream = streamManager.GetStream(tag, logicalFile.Content.Length);
await logicalFile.Content.CopyToAsync(copyStream, token);
copyStream.Seek(0, SeekOrigin.Begin);
/*
return task with logical file
extraction:
regular file: write if matched
archive: loop, write if not "-s", write if matched
*/

//var tag = Path.Combine(archiveRelativePath, logicalFile.Name);
//var copyStream = streamManager.GetStream(tag, logicalFile.Content.Length);
//await logicalFile.Content.CopyToAsync(copyStream, token);
//copyStream.Seek(0, SeekOrigin.Begin);
if (!logicalFile.Content.CanSeek)
{
logicalFile.Content.Seek(0, SeekOrigin.Begin);
throw new InvalidOperationException($"Not seekable: {logicalFile.Content}");
}
var copyStream = CreateThreadSafeCopy(logicalFile.Content);
bool canDispose = true;

if (isRegularFile || !settings.SkipArchives)
Expand Down Expand Up @@ -246,6 +260,16 @@ private async Task<UnpackResult> UnpackArchiveInternal(UnpackArgs args, Stream a
return new UnpackResult(archiveRelativePath, archiveMetadata, args, result);
}

private Stream CreateThreadSafeCopy(Stream stream)
{
if (stream is not StreamView view)
{
throw new ArgumentException($"Only StreamView is supported, got {stream}");
}

view.Stream
}

public static PegStreams? FindPegEntryPair(string name, IReadOnlyDictionary<string, Stream> cache)
{
var cpu = PegFiles.GetCpuFileName(name);
Expand All @@ -260,7 +284,7 @@ private async Task<UnpackResult> UnpackArchiveInternal(UnpackArgs args, Stream a
return new PegStreams(cpuStream, gpuStream);
}

private async Task<UnpackResult> UnpackTexturesInternal(UnpackArgs args, PegStreams pegStreams, CancellationToken token)
private async Task<UnpackResult> UnpackPeg(UnpackArgs args, PegStreams pegStreams, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var (_, name, output, matcher, settings, relativePath) = args;
Expand Down Expand Up @@ -305,7 +329,7 @@ private async Task<UnpackResult> UnpackTexturesInternal(UnpackArgs args, PegStre
return new UnpackResult(archiveRelativePath, archiveMetadata, args, result);
}

private async Task ExtractFile(MemoryStream content, bool isXml, FileInfo outputFile, UnpackSettings settings, CancellationToken token)
private async Task ExtractFile(Stream content, bool isXml, FileInfo outputFile, UnpackSettings settings, CancellationToken token)
{
await using var fileStream = outputFile.OpenWrite();
if (settings.XmlFormat && isXml)
Expand Down
2 changes: 1 addition & 1 deletion src/tests/SyncFactionTests/VppRam/VppInMemoryWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ await Write(ms,
Func<Stream, Stream> wrapperFactory = compressOutput switch
{
true => x => new DeflaterOutputStream(x, new Deflater(compressionLevel)) { IsStreamOwner = false },
false => x => new StreamWrapper(x)
false => x => new DisposableStreamWrapper(x)
};

uint uncompressedSize = 0;
Expand Down

0 comments on commit 5841155

Please sign in to comment.