Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use array pool input message buffer #41

Merged
merged 10 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions benchmarks/Backdash.Benchmarks/Cases/SessionBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System.Net;
using Backdash.Core;
using Backdash.Data;

#pragma warning disable CS0649, AsyncFixer01, AsyncFixer02
// ReSharper disable AccessToDisposedClosure
namespace Backdash.Benchmarks.Cases;

[Flags]
public enum GameInput
{
None = 0,
Up = 1 << 0,
Down = 1 << 1,
Left = 1 << 2,
Right = 1 << 3,
}

public record struct GameState;

[InProcess]
[RPlotExporter]
[MemoryDiagnoser, ExceptionDiagnoser, ThreadingDiagnoser]
[RankColumn, IterationsColumn]
public class SessionBenchmark
{
[Params(10_000)]
public int N;

IRollbackSession<GameInput, GameState> peer1 = null!;
IRollbackSession<GameInput, GameState> peer2 = null!;
CancellationTokenSource cts = null!;

[GlobalSetup]
public void Setup()
{
cts = new();

peer1 = RollbackNetcode.CreateSession<GameInput, GameState>(9000,
new() { Log = new(LogLevel.None) }, new()
{
});

peer2 = RollbackNetcode.CreateSession<GameInput, GameState>(9001,
new() { Log = new(LogLevel.None) }, new()
{
});

peer1.AddPlayer(new LocalPlayer(1));
peer1.AddPlayer(new RemotePlayer(2, IPAddress.Loopback, 9001));

peer2.AddPlayer(new RemotePlayer(1, IPAddress.Loopback, 9000));
peer2.AddPlayer(new LocalPlayer(2));

peer1.SetHandler(new Handler(peer1));
peer2.SetHandler(new Handler(peer2));

peer1.Start(cts.Token);
peer2.Start(cts.Token);
}

[GlobalCleanup]
public void CleanUp()
{
cts.Cancel();
cts.Dispose();
}

[Benchmark]
public async Task Match2Players()
{
var p1 = peer1.GetPlayers().Single(x => x.IsLocal());
var p2 = peer2.GetPlayers().Single(x => x.IsLocal());

var input = GameInput.Up | GameInput.Right;

await Task.WhenAll(
Task.Run(() =>
{
while (peer1.CurrentFrame.Number < N)
{
peer1.BeginFrame();
if (peer1.AddLocalInput(p1, input) is ResultCode.Ok &&
peer1.SynchronizeInputs() is ResultCode.Ok)
peer1.AdvanceFrame();
}
}),
Task.Run(() =>
{
while (peer2.CurrentFrame.Number < N)
{
peer2.BeginFrame();
if (peer2.AddLocalInput(p2, input) is ResultCode.Ok &&
peer2.SynchronizeInputs() is ResultCode.Ok)
peer2.AdvanceFrame();
}
})
);

await cts.CancelAsync();
await Task.WhenAll(peer1.WaitToStop(), peer2.WaitToStop());
}
}

sealed class Handler(IRollbackSession<GameInput> session) : IRollbackHandler<GameState>
{
public void OnSessionStart()
{
}

public void OnSessionClose()
{
}

public void SaveState(in Frame frame, ref GameState state)
{
}

public void LoadState(in Frame frame, in GameState gameState)
{
}

public void AdvanceFrame()
{
session.AdvanceFrame();
}

public void TimeSync(FrameSpan framesAhead)
{
}

public void OnPeerEvent(PlayerHandle player, PeerEventInfo evt)
{
}
}
4 changes: 2 additions & 2 deletions benchmarks/Backdash.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using BenchmarkDotNet.Running;

BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args);
// BenchmarkRunner.Run<GetBitStringBenchmark>();
// BenchmarkRunner.Run<UdpClientBenchmark>();
// await new UdpClientBenchmark().Start(10, false).ConfigureAwait(false);
// BenchmarkRunner.Run<UdpClientBenchmark>();
2 changes: 1 addition & 1 deletion src/Backdash/Network/Client/PeerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ async Task ReceiveLoop(CancellationToken ct)
{
var buffer = Mem.CreatePinnedMemory(maxPacketSize);
SocketAddress address = new(socket.AddressFamily);
T msg = default;
while (!ct.IsCancellationRequested)
{
int receivedSize;
Expand Down Expand Up @@ -114,6 +113,7 @@ async Task ReceiveLoop(CancellationToken ct)

try
{
T msg = new();
serializer.Deserialize(buffer[..receivedSize].Span, ref msg);
await observer.OnPeerMessage(msg, address, receivedSize, ct).ConfigureAwait(false);
}
Expand Down
64 changes: 51 additions & 13 deletions src/Backdash/Network/Messages/InputMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using Backdash.Core;
using Backdash.Data;
Expand All @@ -8,19 +10,37 @@
namespace Backdash.Network.Messages;

[Serializable]
record struct InputMessage : IBinarySerializable, IUtf8SpanFormattable
struct InputMessage : IDisposable, IBinarySerializable, IUtf8SpanFormattable, IEquatable<InputMessage>
{
public PeerStatusBuffer PeerConnectStatus;
public Frame StartFrame;
public bool DisconnectRequested;
public Frame AckFrame;
public ushort NumBits;
public byte InputSize;
public InputMessageBuffer Bits;
public Memory<byte> Bits = Memory<byte>.Empty;

public InputMessage() => InitBits();

public InputMessage(ReadOnlySpan<byte> bits) : this()
{
InitBits();
bits.CopyTo(Bits.Span);
}

public void InitBits()
{
var buffer = ArrayPool<byte>.Shared.Rent(Max.CompressedBytes);
buffer.AsSpan().Clear();
Bits = buffer;
}

public readonly int InputByteSize() => (int)Math.Ceiling(NumBits / (float)ByteSize.ByteToBits);
public readonly Memory<byte> InputBytes() => Bits[..InputByteSize()];

public void Clear()
{
Mem.Clear(Bits);
Mem.Clear(Bits.Span);
PeerConnectStatus[..].Clear();
StartFrame = Frame.Zero;
DisconnectRequested = false;
Expand All @@ -42,7 +62,7 @@ public readonly void Serialize(BinarySpanWriter writer)
writer.Write(in InputSize);
writer.Write(in NumBits);
var bitCount = (int)Math.Ceiling(NumBits / (float)ByteSize.ByteToBits);
writer.Write(Bits[..bitCount]);
writer.Write(Bits.Span[..bitCount]);
}

public void Deserialize(BinarySpanReader reader)
Expand All @@ -56,7 +76,11 @@ public void Deserialize(BinarySpanReader reader)
InputSize = reader.ReadByte();
NumBits = reader.ReadUShort();
var bitCount = (int)Math.Ceiling(NumBits / (float)ByteSize.ByteToBits);
reader.ReadByte(Bits[..bitCount]);

if (Bits.Length is 0)
InitBits();

reader.ReadByte(Bits.Span[..bitCount]);
}

public readonly bool TryFormat(
Expand All @@ -70,6 +94,28 @@ public readonly bool TryFormat(
if (!writer.Write(NumBits)) return false;
return true;
}

public readonly bool Equals(InputMessage other) =>
PeerConnectStatus[..].SequenceEqual(other.PeerConnectStatus) &&
StartFrame.Equals(other.StartFrame) &&
DisconnectRequested == other.DisconnectRequested &&
AckFrame.Equals(other.AckFrame) && NumBits == other.NumBits &&
InputSize == other.InputSize &&
Mem.EqualBytes(InputBytes().Span, other.InputBytes().Span, truncate: true);

public override readonly bool Equals(object? obj) => obj is InputMessage other && Equals(other);

public override readonly int GetHashCode() => HashCode.Combine(
PeerConnectStatus, StartFrame, DisconnectRequested, AckFrame, NumBits, InputSize, Bits);

public readonly void Dispose()
{
if (Bits.Length > 0 && MemoryMarshal.ToEnumerable<byte>(Bits) is byte[] array)
ArrayPool<byte>.Shared.Return(array);
}

public static bool operator ==(InputMessage left, InputMessage right) => left.Equals(right);
public static bool operator !=(InputMessage left, InputMessage right) => !left.Equals(right);
}

[Serializable, InlineArray(Max.NumberOfPlayers)]
Expand Down Expand Up @@ -103,11 +149,3 @@ public override readonly string ToString()
return builder.ToString();
}
}

[Serializable, InlineArray(Max.CompressedBytes)]
struct InputMessageBuffer
{
byte element0;
public InputMessageBuffer(ReadOnlySpan<byte> bits) => bits.CopyTo(this);
public override readonly string ToString() => Mem.GetBitString(this);
}
11 changes: 9 additions & 2 deletions src/Backdash/Network/Messages/ProtocolMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
namespace Backdash.Network.Messages;

[StructLayout(LayoutKind.Explicit, Pack = 2)]
struct ProtocolMessage(MessageType type) : IBinarySerializable, IEquatable<ProtocolMessage>, IUtf8SpanFormattable
struct ProtocolMessage(MessageType type)
: IDisposable, IBinarySerializable, IEquatable<ProtocolMessage>, IUtf8SpanFormattable
{
[FieldOffset(0)]
public Header Header = new(type);
Expand All @@ -29,7 +30,7 @@ struct ProtocolMessage(MessageType type) : IBinarySerializable, IEquatable<Proto
[FieldOffset(Header.Size)]
public KeepAlive KeepAlive;

[FieldOffset(Header.Size)]
[FieldOffset(Header.Size + 2)]
public InputMessage Input;

public readonly void Serialize(BinarySpanWriter writer)
Expand Down Expand Up @@ -114,6 +115,12 @@ public override readonly string ToString()
return $"Msg({Header.Type}){info}";
}

public readonly void Dispose()
{
if (Header.Type is MessageType.Input)
Input.Dispose();
}

public readonly bool TryFormat(
Span<byte> utf8Destination, out int bytesWritten,
ReadOnlySpan<char> format, IFormatProvider? provider
Expand Down
8 changes: 4 additions & 4 deletions src/Backdash/Network/Protocol/Comm/InputEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace Backdash.Network.Protocol.Comm;

static class InputEncoder
{
public static DeltaXorRle.Encoder GetCompressor(ref InputMessage inputMsg, Span<byte> lastBuffer) =>
new(inputMsg.Bits, lastBuffer);
public static DeltaXorRle.Encoder GetCompressor(in InputMessage inputMsg, Span<byte> lastBuffer) =>
new(inputMsg.Bits.Span, lastBuffer);

public static DeltaXorRle.Decoder GetDecompressor(ref InputMessage inputMsg) =>
new(inputMsg.Bits, inputMsg.NumBits);
public static DeltaXorRle.Decoder GetDecompressor(in InputMessage inputMsg) =>
new(inputMsg.Bits.Span, inputMsg.NumBits);
}
Loading
Loading