Skip to content

Commit

Permalink
fix the issue that may occur when the UT runs concurrently (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
eventhorizon-cli authored Mar 9, 2024
1 parent 492a973 commit 8bdc420
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public MemoryBufferQueue(string topicName, int partitionNumber)
_partitions = new MemoryBufferPartition<T>[partitionNumber];
for (var i = 0; i < partitionNumber; i++)
{
_partitions[i] = new MemoryBufferPartition<T>();
_partitions[i] = new MemoryBufferPartition<T>(i);
}

_producer = new MemoryBufferProducer<T>(topicName, _partitions);
Expand Down
15 changes: 7 additions & 8 deletions src/Mocha.Core/Buffer/Memory/MemoryBufferPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ namespace Mocha.Core.Buffer.Memory;
[DebuggerTypeProxy(typeof(MemoryBufferPartition<>.DebugView))]
internal sealed class MemoryBufferPartition<T>
{
// internal for testing
internal static int SegmentLength = 1024;

private static int _idIncreasement;
// internal for test
internal readonly int _segmentLength;

private volatile MemoryBufferSegment<T> _head;
private volatile MemoryBufferSegment<T> _tail;
Expand All @@ -25,10 +23,11 @@ internal sealed class MemoryBufferPartition<T>

private readonly object _createSegmentLock;

public MemoryBufferPartition()
public MemoryBufferPartition(int id, int segmentLength = 1024)
{
PartitionId = _idIncreasement++;
_head = _tail = new MemoryBufferSegment<T>(SegmentLength, default);
_segmentLength = segmentLength;
PartitionId = id;
_head = _tail = new MemoryBufferSegment<T>(_segmentLength, default);
_consumerReaders = new ConcurrentDictionary<string, Reader>();
_consumers = new HashSet<MemoryBufferConsumer<T>>();

Expand Down Expand Up @@ -77,7 +76,7 @@ public void Enqueue(T item)
var newSegmentStartOffset = tail.EndOffset + 1;
var newSegment = TryRecycleSegment(newSegmentStartOffset, out var recycledSegment)
? recycledSegment
: new MemoryBufferSegment<T>(SegmentLength, newSegmentStartOffset);
: new MemoryBufferSegment<T>(_segmentLength, newSegmentStartOffset);
tail.NextSegment = newSegment;
_tail = newSegment;
}
Expand Down
7 changes: 7 additions & 0 deletions src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t

var writePosition = Math.Min(_writePosition, _slots.Length - 1);
var actualCount = Math.Min(count, writePosition - readPosition + 1);
var wholeSegment = readPosition == 0 && actualCount == _slots.Length;
if (wholeSegment)
{
items = _slots;
return true;
}

items = _slots[readPosition..(readPosition + actualCount)];
return true;
}
Expand Down
20 changes: 5 additions & 15 deletions tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferPartitionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ public class MemoryBufferPartitionTests
[Fact]
public void Enqueue_And_TryPull()
{
MemoryBufferPartition<int>.SegmentLength = 2;

var partition = new MemoryBufferPartition<int>();
var partition = new MemoryBufferPartition<int>(0, 2);

for (var i = 0; i < 12; i++)
{
Expand Down Expand Up @@ -47,9 +45,7 @@ public void Enqueue_And_TryPull()
[Fact]
public void Repeatable_Pull_If_Not_Commit()
{
MemoryBufferPartition<int>.SegmentLength = 2;

var partition = new MemoryBufferPartition<int>();
var partition = new MemoryBufferPartition<int>(0, 2);

for (var i = 0; i < 11; i++)
{
Expand Down Expand Up @@ -86,9 +82,7 @@ public void Repeatable_Pull_If_Not_Commit()
[Fact]
public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_Single_Group()
{
MemoryBufferPartition<int>.SegmentLength = 3;

var partition = new MemoryBufferPartition<int>();
var partition = new MemoryBufferPartition<int>(0, 3);

for (var i = 0; i < 9; i++)
{
Expand Down Expand Up @@ -122,9 +116,7 @@ public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_Single_Group()
[Fact]
public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_MultipleGroup()
{
MemoryBufferPartition<int>.SegmentLength = 3;

var partition = new MemoryBufferPartition<int>();
var partition = new MemoryBufferPartition<int>(0, 3);

for (var i = 0; i < 9; i++)
{
Expand Down Expand Up @@ -156,9 +148,7 @@ public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_MultipleGroup()
[Fact]
public void Segment_Will_Not_Be_Recycled_If_Not_All_Consumers_Consumed_MultipleGroup()
{
MemoryBufferPartition<int>.SegmentLength = 3;

var partition = new MemoryBufferPartition<int>();
var partition = new MemoryBufferPartition<int>(0, 3);

for (var i = 0; i < 6; i++)
{
Expand Down
17 changes: 5 additions & 12 deletions tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ namespace Mocha.Core.Tests.Buffer.Memory;

public class MemoryBufferQueueTests
{
public MemoryBufferQueueTests()
{
// Avoid deadlock when testing,
// xunit may set the SynchronizationContext to a single-threaded context
SynchronizationContext.SetSynchronizationContext(null);
// Avoid the impact of the default segment length on the test
MemoryBufferPartition<int>.SegmentLength = 1024;
}
private static int MemoryBufferPartitionSegmentLength => new MemoryBufferPartition<int>(0)._segmentLength;

[Fact]
public async Task Produce_And_Consume()
Expand Down Expand Up @@ -331,7 +324,7 @@ public void Equal_Distribution_Load_Balancing_Strategy_For_Consumers()
[Fact]
public void Concurrent_Producer_Single_Partition()
{
var messageSize = MemoryBufferPartition<int>.SegmentLength * 4;
var messageSize = MemoryBufferPartitionSegmentLength * 4;

var queue = new MemoryBufferQueue<int>("test", 1);

Expand Down Expand Up @@ -378,7 +371,7 @@ public void Concurrent_Producer_Single_Partition()
[Fact]
public void Concurrent_Producer_Multiple_Partition()
{
var messageSize = MemoryBufferPartition<int>.SegmentLength * 4;
var messageSize = MemoryBufferPartitionSegmentLength * 4;

var queue = new MemoryBufferQueue<int>("test", Environment.ProcessorCount);

Expand Down Expand Up @@ -440,7 +433,7 @@ public void Concurrent_Producer_Multiple_Partition()
[InlineData(3, 10000)]
public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize)
{
var messageSize = MemoryBufferPartition<int>.SegmentLength * 4;
var messageSize = MemoryBufferPartitionSegmentLength * 4;
var partitionNumber = Environment.ProcessorCount * 2;
var consumerNumberPerGroup = Environment.ProcessorCount;

Expand Down Expand Up @@ -500,7 +493,7 @@ public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize)
[InlineData(3)]
public void Concurrent_Producer_And_Concurrent_Consumer_Multiple_Groups(int groupNumber)
{
var messageSize = MemoryBufferPartition<int>.SegmentLength * 4;
var messageSize = MemoryBufferPartitionSegmentLength * 4;
var partitionNumber = Environment.ProcessorCount * 2;
var consumerNumberPerGroup = Environment.ProcessorCount;

Expand Down

0 comments on commit 8bdc420

Please sign in to comment.