Skip to content

Commit

Permalink
Shared memory content body optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Sep 9, 2023
1 parent 249a64e commit f3e50fa
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 106 deletions.
17 changes: 9 additions & 8 deletions src/RabbitMQ.Next.Consumer/DeliverMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class DeliverMessageHandler : IMessageHandler<DeliverMethod>
private readonly ISerializer serializer;
private readonly IAcknowledgement acknowledgement;
private readonly PoisonMessageMode onPoisonMessage;
private readonly Channel<(DeliveredMessage message, ulong deliveryTag)> deliverChannel;
private readonly Channel<(DeliverMethod method, IPayload payload)> deliverChannel;

public DeliverMessageHandler(
Func<IDeliveredMessage, ValueTask> messageHandler,
Expand All @@ -28,11 +28,11 @@ public DeliverMessageHandler(
this.serializer = serializer;
this.onPoisonMessage = onPoisonMessage;

this.deliverChannel = Channel.CreateUnbounded<(DeliveredMessage message, ulong deliveryTag)>(new UnboundedChannelOptions
this.deliverChannel = Channel.CreateUnbounded<(DeliverMethod method, IPayload payload)>(new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = concurrencyLevel == 1,
AllowSynchronousContinuations = false
AllowSynchronousContinuations = false,
});


Expand All @@ -43,7 +43,7 @@ public DeliverMessageHandler(
}

public void Handle(DeliverMethod method, IPayload payload)
=> this.deliverChannel.Writer.TryWrite((new DeliveredMessage(this.serializer, method, payload), method.DeliveryTag));
=> this.deliverChannel.Writer.TryWrite((method, payload));


public void Release(Exception ex = null)
Expand All @@ -58,18 +58,19 @@ private async Task ProcessDeliveredMessagesAsync()
{
while (reader.TryRead(out var delivered))
{
var message = new DeliveredMessage(this.serializer, delivered.method, delivered.payload);
try
{
await this.messageHandler(delivered.message);
await this.acknowledgement.AckAsync(delivered.deliveryTag);
await this.messageHandler(message);
await this.acknowledgement.AckAsync(delivered.method.DeliveryTag);
}
catch (Exception)
{
await this.acknowledgement.NackAsync(delivered.deliveryTag, this.onPoisonMessage == PoisonMessageMode.Requeue);
await this.acknowledgement.NackAsync(delivered.method.DeliveryTag, this.onPoisonMessage == PoisonMessageMode.Requeue);
}
finally
{
delivered.message.Dispose();
message.Dispose();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public string Parse(ReadOnlySequence<byte> bytes)
return string.Empty;
}

if (bytes.IsSingleSegment)
{
return Encoding.UTF8.GetString(bytes.FirstSpan);
}
return bytes.IsSingleSegment ? Encoding.UTF8.GetString(bytes.FirstSpan) : ParseChunked(bytes);
}

private static string ParseChunked(ReadOnlySequence<byte> bytes)
{
var decoder = Encoding.UTF8.GetDecoder();
var chunks = new List<ArraySegment<char>>();
var totalChars = 0;
Expand Down
67 changes: 26 additions & 41 deletions src/RabbitMQ.Next/Buffers/SharedMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ internal sealed class SharedMemory : IDisposable
private int referencesCount = 1;
private byte[] memory;

public SharedMemory(ObjectPool<byte[]> memoryPool, byte[] memory)
public SharedMemory(ObjectPool<byte[]> memoryPool, byte[] memory, int size)
{
ArgumentNullException.ThrowIfNull(memoryPool);
ArgumentNullException.ThrowIfNull(memory);

this.memoryPool = memoryPool;
this.memory = memory;
this.Size = size;
}

public void Dispose()
Expand All @@ -33,9 +34,17 @@ public void Dispose()
this.memory = null;
}

public int Size { get; }

public MemoryAccessor Slice(int offset)
=> new(this, offset, this.Size - offset);

public MemoryAccessor Slice(int offset, int size)
=> new(this, offset, size);

public static implicit operator MemoryAccessor(SharedMemory memory)
=> new (memory, 0, memory.Size);

private void DisposeCheck()
{
if (this.memory == null)
Expand All @@ -49,7 +58,7 @@ public readonly ref struct MemoryAccessor
private readonly SharedMemory owner;
private readonly int offset;

public MemoryAccessor(SharedMemory owner, int offset, int length)
public MemoryAccessor(SharedMemory owner, int offset, int size)
{
ArgumentNullException.ThrowIfNull(owner);
owner.DisposeCheck();
Expand All @@ -59,60 +68,34 @@ public MemoryAccessor(SharedMemory owner, int offset, int length)
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (offset > owner.memory.Length)
if (offset > owner.Size)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if(offset + length > owner.memory.Length)
if(offset + size > owner.Size)
{
throw new ArgumentOutOfRangeException(nameof(length));
throw new ArgumentOutOfRangeException(nameof(size));
}

this.owner = owner;
this.offset = offset;
this.Length = length;
this.Size = size;
this.Span = new (this.owner.memory, offset, size);
}

public int Length { get; }

public ReadOnlySpan<byte> Span
{
get
{
this.owner.DisposeCheck();
return new (this.owner.memory, this.offset, this.Length);
}
}
public int Size { get; }

public ReadOnlySpan<byte> Span { get; }

public MemoryAccessor Slice(int offset)
=> this.Slice(offset, this.Length - offset);
=> new(this.owner, this.offset + offset, this.Size - offset);

public MemoryAccessor Slice(int offset, int length)
{
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (length < 0)
{
throw new ArgumentOutOfRangeException(nameof(length));
}

if (offset + length > this.Length)
{
throw new ArgumentOutOfRangeException(nameof(length));
}

return new(this.owner, this.offset + offset, length);
}
public MemoryAccessor Slice(int offset, int size)
=> new(this.owner, this.offset + offset, size);

public IMemoryAccessor AsRef()
{
this.owner.DisposeCheck();
return new SharedMemoryAccessor(this.owner, this.offset, this.Length);
}
public IMemoryAccessor AsRef()
=> new SharedMemoryAccessor(this.owner, this.offset, this.Size);
}

private sealed class SharedMemoryAccessor : IMemoryAccessor
Expand All @@ -122,6 +105,8 @@ private sealed class SharedMemoryAccessor : IMemoryAccessor

public SharedMemoryAccessor(SharedMemory owner, int offset, int size)
{
owner.DisposeCheck();

this.owner = owner;
this.offset = offset;
this.Size = size;
Expand Down
5 changes: 5 additions & 0 deletions src/RabbitMQ.Next/Channels/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ private void ValidateState()

public void PushFrame(FrameType type, SharedMemory.MemoryAccessor payload)
{
if (type == FrameType.Heartbeat)
{
return;
}

if (this.currentFrameHandler == null)
{
if (type != FrameType.Method)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private FrameType ParseContentBodyFrame(SharedMemory.MemoryAccessor payload)
this.contentBodyTail = this.contentBodyTail.Append(payload.AsRef());
}

this.pendingContentSize -= payload.Length;
this.pendingContentSize -= payload.Size;
return (this.pendingContentSize > 0) ? FrameType.ContentBody : FrameType.None;
}
}
123 changes: 73 additions & 50 deletions src/RabbitMQ.Next/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,71 +151,68 @@ private void ReceiveLoop(CancellationToken cancellationToken)
{
try
{
IMemoryAccessor previousChunk = null;
var expectedBytes = ProtocolConstants.FrameHeaderSize;
// 1. Receive next chunk of data.
SharedMemory receivedMemory = null;

while (!cancellationToken.IsCancellationRequested)
{
// 1. Obtain next buffer
var buffer = this.memoryPool.Get();
var bufferOffset = 0;

// 2. Copy bytes leftover from previous chunk if any
if (previousChunk != null)
SharedMemory.MemoryAccessor currentAccessor = default;
if (receivedMemory != null)
{
previousChunk.CopyTo(buffer);
bufferOffset = previousChunk.Size;
previousChunk.Dispose();
previousChunk = null;
currentAccessor = receivedMemory;
}

// 3. Read data from the socket at least of frame header size
var received = this.socket.Receive(buffer, bufferOffset, expectedBytes);
var receivedMemory = new SharedMemory(this.memoryPool, buffer);
var bufferSize = bufferOffset + received;

// 4. Parse received frames
var receivedSlice = receivedMemory.Slice(0, bufferSize);
while (receivedSlice.Length >= ProtocolConstants.FrameHeaderSize)

// 2. Parse received frames
while (currentAccessor.Size >= ProtocolConstants.FrameHeaderSize)
{
// 4.1. Read frame header
receivedSlice.Span.ReadFrameHeader(out var frameType, out var channel, out var payloadSize);
var totalFrameSize = ProtocolConstants.FrameHeaderSize + (int)payloadSize + ProtocolConstants.FrameEndSize;

// 4.2. Ensure entire frame was loaded
if (totalFrameSize > receivedSlice.Length)
// 2.1. Read frame header
currentAccessor.Span.ReadFrameHeader(out var frameType, out var channel, out var payloadSize);
currentAccessor = currentAccessor.Slice(ProtocolConstants.FrameHeaderSize);

// 2.2. Lookup for the target channel to push the frame
var targetChannel = (channel == ProtocolConstants.ConnectionChannel) ? this.connectionChannel : this.channelPool.Get(channel);

// 2.3. Slice frame bytes
SharedMemory.MemoryAccessor frameBytes;
if (currentAccessor.Size >= payloadSize + ProtocolConstants.FrameEndSize)
{
expectedBytes = totalFrameSize - receivedSlice.Length;
break;
frameBytes = currentAccessor.Slice(0, (int)payloadSize);
}

// 4.3. Ensure frame end if present
if (receivedSlice.Span[totalFrameSize - 1] != ProtocolConstants.FrameEndByte)
else
{
// TODO: throw connection exception here
throw new InvalidOperationException();
}
var missedBytes = (int)payloadSize - currentAccessor.Size;

// 4.4. Can safely ignore Heartbeat frame
if (frameType != FrameType.Heartbeat)
{
// 4.5. Slice frame payload
var framePayload = receivedSlice.Slice(ProtocolConstants.FrameHeaderSize, (int)payloadSize);
if (frameType == FrameType.ContentBody)
{
// ContentBody frame could be easily processed chunked
targetChannel.PushFrame(frameType, currentAccessor);
currentAccessor = default;
}

// 4.6. Write frame to appropriate channel
var targetChannel = (channel == ProtocolConstants.ConnectionChannel) ? this.connectionChannel : this.channelPool.Get(channel);
targetChannel.PushFrame(frameType, framePayload);
var previousMemory = receivedMemory;
receivedMemory = ReceiveNext(missedBytes + ProtocolConstants.FrameEndSize, currentAccessor);
previousMemory.Dispose();

currentAccessor = receivedMemory;
frameBytes = currentAccessor.Slice(0, frameType == FrameType.ContentBody ? missedBytes : (int)payloadSize);
}

receivedSlice = receivedSlice.Slice(totalFrameSize);
expectedBytes = ProtocolConstants.FrameHeaderSize;
}
// 2.4. Ensure frame end present just after the current frame bytes
if (currentAccessor.Span[frameBytes.Size] != ProtocolConstants.FrameEndByte)
{
// TODO: throw connection exception here
throw new InvalidOperationException();
}

if (receivedSlice.Length > 0)
{
previousChunk = receivedSlice.AsRef();
// 2.5. Push frame to the target channel
targetChannel.PushFrame(frameType, frameBytes);
currentAccessor = currentAccessor.Slice(frameBytes.Size + ProtocolConstants.FrameEndSize);
}

receivedMemory.Dispose();
// 3. Receive next chunk with preserving leftovers of the currently not yet parsed data
var nextChunk = ReceiveNext(ProtocolConstants.FrameHeaderSize, currentAccessor);
receivedMemory?.Dispose();
receivedMemory = nextChunk;
}
}
catch (SocketException ex)
Expand All @@ -224,6 +221,32 @@ private void ReceiveLoop(CancellationToken cancellationToken)

this.ConnectionClose(ex);
}
catch (Exception ex)
{
this.ConnectionClose(ex);
}

return;

SharedMemory ReceiveNext(int expectedBytes, SharedMemory.MemoryAccessor previousChunk = default)
{
cancellationToken.ThrowIfCancellationRequested();

// Obtain next buffer
var buffer = this.memoryPool.Get();
var bufferOffset = 0;

// Copy bytes leftover from previous chunk if any
if (previousChunk.Size > 0)
{
previousChunk.Span.CopyTo(buffer);
bufferOffset = previousChunk.Size;
}

// Read data from the socket at least of requested size
var received = this.socket.Receive(buffer, bufferOffset, expectedBytes);
return new SharedMemory(this.memoryPool, buffer, bufferOffset + received);
}
}

private void ConnectionClose(Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace RabbitMQ.Next.Benchmarks.Consumer;

public class ConsumerBenchmarks
{
private readonly int messagesCount = 10000;
private readonly int messagesCount = 10_000;
private readonly string queueName = "test-queue";
private IConnection connection;
private RabbitMQ.Client.IConnection theirConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ TestCaseParameters GenerateTestCase(int payloadLen, int count, string name)
yield return GenerateTestCase(1024, 10_000, "1024 (1 kB)");
yield return GenerateTestCase(10240, 10_000, "10240 (10 kB)");
yield return GenerateTestCase(102400, 10_000, "102400 (100 kB)");
yield return GenerateTestCase(204800, 10_000, "204800 (200 kB)");
//yield return GenerateTestCase(204800, 10_000, "204800 (200 kB)");
}

public class TestCaseParameters
Expand Down

0 comments on commit f3e50fa

Please sign in to comment.